Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
remove unused metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed Sep 4, 2023
1 parent 9989d73 commit 35646a8
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 253 deletions.
103 changes: 7 additions & 96 deletions fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
}
// if the context is already cancelled, there's nothing we can do here.
if ce := ctx.Err(); ce != nil {
fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("%t", errors.Is(ce, context.Canceled)), "fetchResource-init").Add(1)
return ce
}

Expand Down Expand Up @@ -117,19 +116,14 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
ttfbMs = fb.Sub(start).Milliseconds()

cacheStatus := getCacheStatus(isCacheHit)
if isBlockRequest {
fetchSizeBlockMetric.Observe(float64(received))
} else {
if !isBlockRequest {
fetchSizeCarMetric.WithLabelValues("success").Observe(float64(received))
}
durationMs := response_success_end.Sub(start).Milliseconds()
fetchSpeedPerPeerSuccessMetric.WithLabelValues(resourceType, cacheStatus).Observe(float64(received) / float64(durationMs))
fetchCacheCountSuccessTotalMetric.WithLabelValues(resourceType, cacheStatus).Add(1)
// track individual block metrics separately
if isBlockRequest {
fetchTTFBPerBlockPerPeerSuccessMetric.WithLabelValues(cacheStatus).Observe(float64(ttfbMs))
fetchDurationPerBlockPerPeerSuccessMetric.WithLabelValues(cacheStatus).Observe(float64(response_success_end.Sub(start).Milliseconds()))
} else {
if !isBlockRequest {
ci := 0
for index, value := range carSizes {
if float64(received) < value {
Expand All @@ -142,20 +136,11 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
fetchTTFBPerCARPerPeerSuccessMetric.WithLabelValues(cacheStatus, carSizeStr).Observe(float64(ttfbMs))
fetchDurationPerCarPerPeerSuccessMetric.WithLabelValues(cacheStatus).Observe(float64(response_success_end.Sub(start).Milliseconds()))
}

// update L1 server timings
updateSuccessServerTimingMetrics(respHeader.Values(servertiming.HeaderKey), resourceType, isCacheHit, durationMs, ttfbMs, received)
} else {
if isBlockRequest {
fetchDurationPerBlockPerPeerFailureMetric.Observe(float64(time.Since(start).Milliseconds()))
} else {
if !isBlockRequest {
fetchDurationPerCarPerPeerFailureMetric.Observe(float64(time.Since(start).Milliseconds()))
fetchSizeCarMetric.WithLabelValues("failure").Observe(float64(received))
}

if code == http.StatusBadGateway || code == http.StatusGatewayTimeout {
updateLassie5xxTime(respHeader.Values(servertiming.HeaderKey), resourceType)
}
}

if err == nil || !errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -195,7 +180,7 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, reqUrl, nil)
if err != nil {
if recordIfContextErr(resourceType, reqCtx, "build-http-request") {
if isCtxError(reqCtx) {
return reqCtx.Err()
}
return err
Expand All @@ -219,11 +204,10 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m

var resp *http.Response
saturnCallsTotalMetric.WithLabelValues(resourceType).Add(1)
startReq := time.Now()

resp, err = p.config.Client.Do(req)
if err != nil {
if recordIfContextErr(resourceType, reqCtx, "send-http-request") {
if isCtxError(reqCtx) {
if errors.Is(err, context.Canceled) {
return reqCtx.Err()
}
Expand All @@ -239,7 +223,6 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
}

respHeader = resp.Header
headerTTFBPerPeerMetric.WithLabelValues(resourceType, getCacheStatus(respHeader.Get(saturnCacheHitKey) == saturnCacheHit)).Observe(float64(time.Since(startReq).Milliseconds()))

defer resp.Body.Close()

Expand Down Expand Up @@ -303,7 +286,7 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
_, _ = io.Copy(io.Discard, resp.Body)

if err != nil {
if recordIfContextErr(resourceType, reqCtx, "read-http-response") {
if isCtxError(reqCtx) {
if errors.Is(err, context.Canceled) {
return reqCtx.Err()
}
Expand All @@ -324,92 +307,20 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m

// trace-metrics
// request life-cycle metrics

fetchRequestSuccessTimeTraceMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit), "tcp_connection").Observe(float64(result.TCPConnection.Milliseconds()))
fetchRequestSuccessTimeTraceMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit), "tls_handshake").Observe(float64(result.TLSHandshake.Milliseconds()))
fetchRequestSuccessTimeTraceMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit), "wait_after_request_sent_for_header").Observe(float64(result.ServerProcessing.Milliseconds()))

fetchRequestSuccessTimeTraceMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit), "name_lookup").
Observe(float64(result.NameLookup.Milliseconds()))
fetchRequestSuccessTimeTraceMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit), "connect").
Observe(float64(result.Connect.Milliseconds()))
fetchRequestSuccessTimeTraceMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit), "pre_transfer").
Observe(float64(result.Pretransfer.Milliseconds()))
fetchRequestSuccessTimeTraceMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit), "start_transfer").
Observe(float64(result.StartTransfer.Milliseconds()))

saturnCallsSuccessTotalMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit)).Add(1)

from.RecordSuccess(start, float64(wrapped.firstByte.Sub(start).Milliseconds()), float64(received)/float64(response_success_end.Sub(start).Milliseconds()))

return nil
}

func recordIfContextErr(resourceType string, ctx context.Context, requestState string) bool {
func isCtxError(ctx context.Context) bool {
if ce := ctx.Err(); ce != nil {
fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("%t", errors.Is(ce, context.Canceled)), requestState).Add(1)
return true
}
return false
}

func updateLassie5xxTime(timingHeaders []string, resourceType string) {
if len(timingHeaders) == 0 {
goLogger.Debug("no timing headers in request response.")
return
}

for _, th := range timingHeaders {
if subReqTiming, err := servertiming.ParseHeader(th); err == nil {
for _, m := range subReqTiming.Metrics {
switch m.Name {
case "shim_lassie_headers":
if m.Duration.Milliseconds() != 0 {
lassie5XXTimeMetric.WithLabelValues(resourceType).Observe(float64(m.Duration.Milliseconds()))
}
return
default:
}
}
}
}
}

// todo: refactor for dryness
func updateSuccessServerTimingMetrics(timingHeaders []string, resourceType string, isCacheHit bool, totalTimeMs, ttfbMs int64, recieved int) {
if len(timingHeaders) == 0 {
goLogger.Debug("no timing headers in request response.")
return
}

for _, th := range timingHeaders {
if subReqTiming, err := servertiming.ParseHeader(th); err == nil {
for _, m := range subReqTiming.Metrics {
switch m.Name {
case "shim_lassie_headers":
if m.Duration.Milliseconds() != 0 && !isCacheHit {
fetchDurationPerPeerSuccessCacheMissTotalLassieMetric.WithLabelValues(resourceType).Observe(float64(m.Duration.Milliseconds()))
}

case "nginx":
// sanity checks
if totalTimeMs != 0 && ttfbMs != 0 && m.Duration.Milliseconds() != 0 {
fetchDurationPerPeerSuccessTotalL1NodeMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit)).Observe(float64(m.Duration.Milliseconds()))
networkTimeMs := totalTimeMs - m.Duration.Milliseconds()
if networkTimeMs > 0 {
s := float64(recieved) / float64(networkTimeMs)
fetchNetworkSpeedPerPeerSuccessMetric.WithLabelValues(resourceType).Observe(s)
}
networkLatencyMs := ttfbMs - m.Duration.Milliseconds()
fetchNetworkLatencyPeerSuccessMetric.WithLabelValues(resourceType).Observe(float64(networkLatencyMs))
}
default:
}
}
}
}
}

func getCacheStatus(isCacheHit bool) string {
if isCacheHit {
return "Cache-hit"
Expand Down
127 changes: 1 addition & 126 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,13 @@ var (
)

var (
// size buckets from 256 KiB (default chunk in Kubo) to 4MiB (maxBlockSize), 256 KiB wide each
// histogram buckets will be [256KiB, 512KiB, 768KiB, 1MiB, ... 4MiB] -> total 16 buckets +1 prometheus Inf bucket
blockSizeHistogram = prometheus.LinearBuckets(262144, 262144, 16)

// TODO: Speed max bucket could use some further refinement,
// for now we don't expect speed being bigger than transfering 4MiB (max block) in 500ms
// histogram buckets will be [1byte/milliseconds, ... 8387 bytes/milliseconds] -> total 20 buckets +1 prometheus Inf bucket
speedBytesPerMsHistogram = prometheus.ExponentialBucketsRange(1, 4194304/500, 20)

// ----- Histogram buckets to record fetch duration metrics -----
// The upper bound on the fetch duration buckets are informed by the timeouts per block and per peer request/retry.

// buckets to record duration in milliseconds to fetch a block,
// histogram buckets will be [50ms,.., 60 seconds] -> total 20 buckets +1 prometheus Inf bucket
durationMsPerBlockHistogram = prometheus.ExponentialBucketsRange(50, 60000, 20)

// buckets to record duration in milliseconds to fetch a CAR,
// histogram buckets will be [50ms,.., 30 minutes] -> total 40 buckets +1 prometheus Inf bucket
durationMsPerCarHistogram = prometheus.ExponentialBucketsRange(50, 1800000, 40)
Expand All @@ -55,7 +46,7 @@ var (
var (
fetchResponseCodeMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_response_code"),
Help: "Response codes observed during caboose fetches for a block",
Help: "Response codes observed during caboose fetches",
}, []string{"resourceType", "code"})

// success cases
Expand All @@ -71,46 +62,6 @@ var (
}, []string{"resourceType", "cache_status"})
)

// block metrics
var (
fetchDurationPerBlockPerPeerSuccessMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_block_peer_success"),
Help: "Latency observed during successful caboose fetches from a single peer in milliseconds",
Buckets: durationMsPerBlockHistogram,
}, []string{"cache_status"})

fetchDurationBlockSuccessMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_block_success"),
Help: "Latency observed during successful caboose fetches for a block across multiple peers and retries in milliseconds",
Buckets: durationMsPerBlockHistogram,
})

fetchTTFBPerBlockPerPeerSuccessMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_ttfb_block_peer_success"),
Help: "TTFB observed during a successful caboose fetch from a single peer in milliseconds",
Buckets: durationMsPerBlockHistogram,
}, []string{"cache_status"})

// failures
fetchDurationPerBlockPerPeerFailureMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_block_peer_failure"),
Help: "Latency observed during failed caboose fetches from a single peer in milliseconds",
Buckets: durationMsPerBlockHistogram,
})

fetchDurationBlockFailureMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_block_failure"),
Help: "Latency observed during failed caboose fetches for a block across multiple peers and retries in milliseconds",
Buckets: durationMsPerBlockHistogram,
})

fetchSizeBlockMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_size_block"),
Help: "Size in bytes of caboose block fetches",
Buckets: blockSizeHistogram,
})
)

// CAR metrics
var (
fetchDurationPerCarPerPeerSuccessMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Expand All @@ -131,11 +82,6 @@ var (
Buckets: durationMsPerCarHistogram,
}, []string{"cache_status", "car_size"})

headerTTFBPerPeerMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "header_ttfb_peer"),
Buckets: durationMsPerCarHistogram,
}, []string{"resourceType", "cache_status"})

// failure
fetchDurationPerCarPerPeerFailureMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_car_peer_failure"),
Expand All @@ -156,57 +102,10 @@ var (
}, []string{"error_status"})
)

// Saturn Server-timings
var (
// ---------------------- For successful fetches ONLY for now----------------------
// L1 server timings
// nginx + l1 compute + lassie
fetchDurationPerPeerSuccessTotalL1NodeMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_peer_total_saturn_l1"),
Help: "Total time spent on an L1 node for a successful fetch per peer in milliseconds",
Buckets: durationMsPerCarHistogram,
}, []string{"resourceType", "cache_status"})

// total only on lassie
fetchDurationPerPeerSuccessCacheMissTotalLassieMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_peer_cache_miss_total_lassie"),
Help: "Time spent in Lassie for a Saturn L1 Nginx cache miss for a successful fetch per peer in milliseconds",
Buckets: durationMsPerCarHistogram,
}, []string{"resourceType"})

lassie5XXTimeMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_5xx_total_lassie"),
Help: "Time spent in Lassie for a Saturn L1 Nginx cache miss for a 5xx in milliseconds",
Buckets: durationMsPerCarHistogram,
}, []string{"resourceType"})

// network timing
fetchNetworkSpeedPerPeerSuccessMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_network_speed_peer_success"),
Help: "Network speed observed during successful caboose fetches from a single peer in bytes per milliseconds",
Buckets: speedBytesPerMsHistogram,
}, []string{"resourceType"})

fetchNetworkLatencyPeerSuccessMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_network_latency_peer_success"),
Help: "Network latency observed during successful caboose fetches from a single peer in milliseconds",
Buckets: durationMsPerCarHistogram,
}, []string{"resourceType"})
)

var (
fetchCalledTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_called_total"),
}, []string{"resourceType"})

fetchRequestContextErrorTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_request_context_error_total"),
}, []string{"resourceType", "errorType", "requestStage"})

fetchRequestSuccessTimeTraceMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_request_success_time_trace"),
Buckets: durationMsPerCarHistogram,
}, []string{"resourceType", "cache_status", "lifecycleStage"})
)

var (
Expand Down Expand Up @@ -238,47 +137,23 @@ func init() {
CabooseMetrics.MustRegister(poolRefreshErrorMetric)
CabooseMetrics.MustRegister(poolSizeMetric)
CabooseMetrics.MustRegister(poolNewMembersMetric)
CabooseMetrics.MustRegister(poolRemovedFailureTotalMetric)
CabooseMetrics.MustRegister(poolRemovedConnFailureTotalMetric)
CabooseMetrics.MustRegister(poolRemovedReadFailureTotalMetric)
CabooseMetrics.MustRegister(poolRemovedNon2xxTotalMetric)
CabooseMetrics.MustRegister(poolMembersNotAddedBecauseRemovedMetric)
CabooseMetrics.MustRegister(poolMembersRemovedAndAddedBackMetric)
CabooseMetrics.MustRegister(poolEnoughObservationsForMainSetDurationMetric)
CabooseMetrics.MustRegister(poolTierChangeMetric)

CabooseMetrics.MustRegister(fetchResponseCodeMetric)
CabooseMetrics.MustRegister(fetchSpeedPerPeerSuccessMetric)

CabooseMetrics.MustRegister(fetchDurationPerBlockPerPeerSuccessMetric)
CabooseMetrics.MustRegister(fetchDurationPerCarPerPeerSuccessMetric)
CabooseMetrics.MustRegister(fetchDurationPerBlockPerPeerFailureMetric)
CabooseMetrics.MustRegister(fetchDurationPerCarPerPeerFailureMetric)
CabooseMetrics.MustRegister(fetchDurationBlockSuccessMetric)
CabooseMetrics.MustRegister(fetchDurationCarSuccessMetric)
CabooseMetrics.MustRegister(fetchDurationBlockFailureMetric)
CabooseMetrics.MustRegister(fetchDurationCarFailureMetric)
CabooseMetrics.MustRegister(fetchTTFBPerBlockPerPeerSuccessMetric)
CabooseMetrics.MustRegister(fetchTTFBPerCARPerPeerSuccessMetric)
CabooseMetrics.MustRegister(headerTTFBPerPeerMetric)

CabooseMetrics.MustRegister(fetchCacheCountSuccessTotalMetric)

CabooseMetrics.MustRegister(fetchDurationPerPeerSuccessTotalL1NodeMetric)
CabooseMetrics.MustRegister(fetchDurationPerPeerSuccessCacheMissTotalLassieMetric)
CabooseMetrics.MustRegister(lassie5XXTimeMetric)

CabooseMetrics.MustRegister(fetchNetworkSpeedPerPeerSuccessMetric)
CabooseMetrics.MustRegister(fetchNetworkLatencyPeerSuccessMetric)

CabooseMetrics.MustRegister(m_collector{&peerLatencyDistribution})

CabooseMetrics.MustRegister(fetchSizeCarMetric)
CabooseMetrics.MustRegister(fetchSizeBlockMetric)

CabooseMetrics.MustRegister(fetchRequestContextErrorTotalMetric)
CabooseMetrics.MustRegister(fetchCalledTotalMetric)
CabooseMetrics.MustRegister(fetchRequestSuccessTimeTraceMetric)

CabooseMetrics.MustRegister(saturnCallsTotalMetric)
CabooseMetrics.MustRegister(saturnCallsFailureTotalMetric)
Expand Down
4 changes: 2 additions & 2 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (p *pool) updateFetchKeyCoolDown(key string) {

func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallback, with string) (err error) {
fetchCalledTotalMetric.WithLabelValues(resourceTypeCar).Add(1)
if recordIfContextErr(resourceTypeCar, ctx, "fetchResourceWith") {
if isCtxError(ctx) {
return ctx.Err()
}

Expand Down Expand Up @@ -312,7 +312,7 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba

pq := []string{path}
for i := 0; i < len(nodes); i++ {
if recordIfContextErr(resourceTypeCar, ctx, "fetchResourceWithLoop") {
if isCtxError(ctx) {
return ctx.Err()
}

Expand Down
Loading

0 comments on commit 35646a8

Please sign in to comment.