-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathmetrics.go
74 lines (67 loc) · 2.31 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package ingest
import (
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/prometheus/client_golang/prometheus"
)
var (
latencyHistogram = operational.DefineMetric(
"ingest_latency_ms",
"Latency between flow end time and ingest time, in milliseconds",
operational.TypeHistogram,
"stage",
)
flowsProcessedCounter = operational.DefineMetric(
"ingest_flows_processed",
"Number of flows received by the ingester",
operational.TypeCounter,
"stage",
)
batchSizeBytesSummary = operational.DefineMetric(
"ingest_batch_size_bytes",
"Ingested batch size distribution, in bytes",
operational.TypeSummary,
"stage",
)
errorsCounter = operational.DefineMetric(
"ingest_errors",
"Counter of errors during ingestion",
operational.TypeCounter,
"stage", "type", "code",
)
)
type metrics struct {
*operational.Metrics
stage string
stageType string
stageDuration prometheus.Observer
latency prometheus.Histogram
flowsProcessed prometheus.Counter
batchSizeBytes prometheus.Summary
errors *prometheus.CounterVec
}
func newMetrics(opMetrics *operational.Metrics, stage, stageType string, inGaugeFunc func() int) *metrics {
opMetrics.CreateInQueueSizeGauge(stage, inGaugeFunc)
return &metrics{
Metrics: opMetrics,
stage: stage,
stageType: stageType,
latency: opMetrics.NewHistogram(&latencyHistogram, []float64{.001, .01, .1, 1, 10, 100, 1000, 10000}, stage),
stageDuration: opMetrics.GetOrCreateStageDurationHisto().WithLabelValues(stage),
flowsProcessed: opMetrics.NewCounter(&flowsProcessedCounter, stage),
batchSizeBytes: opMetrics.NewSummary(&batchSizeBytesSummary, stage),
errors: opMetrics.NewCounterVec(&errorsCounter),
}
}
func (m *metrics) createOutQueueLen(out chan<- config.GenericMap) {
m.CreateOutQueueSizeGauge(m.stage, func() int { return len(out) })
}
// Increment error counter
// `code` should reflect any error code relative to this type. It can be a short string message,
// but make sure to not include any dynamic value with high cardinality
func (m *metrics) error(code string) {
m.errors.WithLabelValues(m.stage, m.stageType, code).Inc()
}
func (m *metrics) stageDurationTimer() *operational.Timer {
return operational.NewTimer(m.stageDuration)
}