Skip to content

Commit

Permalink
Add exemplars to latency histograms, improve metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov committed Nov 25, 2024
1 parent 945b0be commit 28c8594
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 4 deletions.
3 changes: 2 additions & 1 deletion pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/instrument"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kadm"
Expand Down Expand Up @@ -489,7 +490,7 @@ func recordIndexAfterOffset(records []*kgo.Record, offset int64) int {
func (r *concurrentFetchers) recordOrderedFetchTelemetry(f fetchResult, firstReturnedRecordIndex int, waitStartTime time.Time) {
waitDuration := time.Since(waitStartTime)
level.Debug(r.logger).Log("msg", "received ordered fetch", "num_records", len(f.Records), "wait_duration", waitDuration)
r.metrics.fetchWaitDuration.Observe(waitDuration.Seconds())
instrument.ObserveWithExemplar(f.ctx, r.metrics.fetchWaitDuration, waitDuration.Seconds())

var (
doubleFetchedBytes = 0
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/ingest/pusher_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func newPusherConsumerMetrics(reg prometheus.Registerer) *pusherConsumerMetrics
storagePusherMetrics: newStoragePusherMetrics(reg),
processingTimeSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingest_storage_reader_records_processing_time_seconds",
Help: "Time taken to process a batch of fetched records. Fetched records are effectively a set of WriteRequests read from Kafka.",
Help: "Time taken to process a batch of fetched records. Fetched records are effectively a set of WriteRequests read from Kafka. This is the latency of a single attempt and does not include retries.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/instrument"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/services"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -607,7 +608,7 @@ func (r *PartitionReader) consumeFetches(ctx context.Context, fetches kgo.Fetche
MaxRetries: 0, // retry forever
})
defer func(consumeStart time.Time) {
r.metrics.consumeLatency.Observe(time.Since(consumeStart).Seconds())
instrument.ObserveWithExemplar(ctx, r.metrics.consumeLatency, time.Since(consumeStart).Seconds())
}(time.Now())

logger := spanlogger.FromContext(ctx, r.logger)
Expand Down Expand Up @@ -1078,7 +1079,7 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc
}),
consumeLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingest_storage_reader_records_batch_process_duration_seconds",
Help: "How long a consumer spent processing a batch of records from Kafka.",
Help: "How long a consumer spent processing a batch of records from Kafka. This includes retries on server errors.",
NativeHistogramBucketFactor: 1.1,
}),
strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg),
Expand Down

0 comments on commit 28c8594

Please sign in to comment.