Skip to content

Commit

Permalink
feat(metrics): add scheduler related metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Alexei Dodon <[email protected]>
  • Loading branch information
adodon2go committed Nov 22, 2023
1 parent 83f287d commit 651cd19
Show file tree
Hide file tree
Showing 20 changed files with 491 additions and 61 deletions.
2 changes: 1 addition & 1 deletion pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func (c *Controller) Shutdown() {
}

func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
taskScheduler := scheduler.NewScheduler(c.Config, c.Log)
taskScheduler := scheduler.NewScheduler(c.Config, c.Metrics, c.Log)
taskScheduler.RunScheduler(reloadCtx)

// Enable running garbage-collect periodically for DefaultStore
Expand Down
11 changes: 11 additions & 0 deletions pkg/api/cookiestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"context"
"encoding/gob"
"fmt"
"io/fs"
"os"
"path"
Expand Down Expand Up @@ -157,3 +158,13 @@ func (cleanTask *CleanTask) DoWork(ctx context.Context) error {

return nil
}

func (cleanTask *CleanTask) String() string {
return fmt.Sprintf("{SessionCleanup: %s, sessions: %s}",
"session clean task", // description of generator's task purpose
cleanTask.sessions)
}

func (cleanTask *CleanTask) Name() string {
return "SessionCleanupTask"
}
2 changes: 1 addition & 1 deletion pkg/extensions/extension_userprefs_disable.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ func IsBuiltWithUserPrefsExtension() bool {
func SetupUserPreferencesRoutes(config *config.Config, router *mux.Router,
metaDB mTypes.MetaDB, log log.Logger,
) {
log.Warn().Msg("userprefs extension is disabled because given zot binary doesn't" +
log.Warn().Msg("userprefs extension is disabled because given zot binary doesn't " +
"include this feature please build a binary that does so")
}
11 changes: 11 additions & 0 deletions pkg/extensions/imagetrust/image_trust.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package imagetrust

import (
"context"
"fmt"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -270,3 +271,13 @@ func (validityT *validityTask) DoWork(ctx context.Context) error {

return nil
}

func (validityT *validityTask) String() string {
return fmt.Sprintf("{sigValidityTaskGenerator: %s, repo: %s}",
"signatures validity task", // description of generator's task purpose
validityT.repo.Name)
}

func (validityT *validityTask) Name() string {
return "SignatureValidityTask"
}
93 changes: 92 additions & 1 deletion pkg/extensions/monitoring/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,53 @@ var (
},
[]string{"storageName", "lockType"},
)
schedulerGenerators = promauto.NewCounter( //nolint: gochecknoglobals
prometheus.CounterOpts{
Namespace: metricsNamespace,
Name: "scheduler_generators_total",
Help: "Total number of generators registered in scheduler",
},
)
schedulerGeneratorsStatus = promauto.NewGaugeVec( //nolint: gochecknoglobals
prometheus.GaugeOpts{
Namespace: metricsNamespace,
Name: "scheduler_generators_status",
Help: "Scheduler generators by priority & state",
},
[]string{"priority", "state"},
)
schedulerNumWorkers = promauto.NewGauge( //nolint: gochecknoglobals
prometheus.GaugeOpts{ //nolint: promlinter
Namespace: metricsNamespace,
Name: "scheduler_workers_total",
Help: "Total number of available workers to perform scheduler tasks",
},
)
schedulerWorkers = promauto.NewGaugeVec( //nolint: gochecknoglobals
prometheus.GaugeOpts{
Namespace: metricsNamespace,
Name: "scheduler_workers",
Help: "Scheduler workers state",
},
[]string{"state"},
)
schedulerTasksQueue = promauto.NewGaugeVec( //nolint: gochecknoglobals
prometheus.GaugeOpts{
Namespace: metricsNamespace,
Name: "scheduler_tasksqueue_length",
Help: "Number of tasks waiting in the queue to pe processed by scheduler workers",
},
[]string{"priority"},
)
workersTasksDuration = promauto.NewHistogramVec( //nolint: gochecknoglobals
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Name: "scheduler_workers_tasks_duration_seconds",
Help: "How long it takes for a worker to execute a task",
Buckets: GetDefaultBuckets(),
},
[]string{"name"},
)
)

type metricServer struct {
Expand Down Expand Up @@ -169,7 +216,7 @@ func IncDownloadCounter(ms MetricServer, repo string) {
}

func SetStorageUsage(ms MetricServer, rootDir, repo string) {
ms.SendMetric(func() {
ms.ForceSendMetric(func() {
dir := path.Join(rootDir, repo)
repoSize, err := GetDirSize(dir)

Expand All @@ -196,3 +243,47 @@ func ObserveStorageLockLatency(ms MetricServer, latency time.Duration, storageNa
storageLockLatency.WithLabelValues(storageName, lockType).Observe(latency.Seconds())
})
}

func IncSchedulerGenerators(ms MetricServer) {
ms.ForceSendMetric(func() {
schedulerGenerators.Inc()
})
}

func SetSchedulerGenerators(ms MetricServer, gen map[string]map[string]uint64) {
ms.SendMetric(func() {
for priority, states := range gen {
for state, value := range states {
schedulerGeneratorsStatus.WithLabelValues(priority, state).Set(float64(value))
}
}
})
}

func SetSchedulerNumWorkers(ms MetricServer, total int) {
ms.SendMetric(func() {
schedulerNumWorkers.Set(float64(total))
})
}

func SetSchedulerWorkers(ms MetricServer, w map[string]int) {
ms.SendMetric(func() {
for state, value := range w {
schedulerWorkers.WithLabelValues(state).Set(float64(value))
}
})
}

func SetSchedulerTasksQueue(ms MetricServer, tq map[string]int) {
ms.SendMetric(func() {
for priority, value := range tq {
schedulerTasksQueue.WithLabelValues(priority).Set(float64(value))
}
})
}

func ObserveWorkersTasksDuration(ms MetricServer, taskName string, duration time.Duration) {
ms.SendMetric(func() {
workersTasksDuration.WithLabelValues(taskName).Observe(duration.Seconds())
})
}
95 changes: 85 additions & 10 deletions pkg/extensions/monitoring/minimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@ import (
const (
metricsNamespace = "zot"
// Counters.
httpConnRequests = metricsNamespace + ".http.requests"
repoDownloads = metricsNamespace + ".repo.downloads"
repoUploads = metricsNamespace + ".repo.uploads"
httpConnRequests = metricsNamespace + ".http.requests"
repoDownloads = metricsNamespace + ".repo.downloads"
repoUploads = metricsNamespace + ".repo.uploads"
schedulerGenerators = metricsNamespace + ".scheduler.generators"
// Gauge.
repoStorageBytes = metricsNamespace + ".repo.storage.bytes"
serverInfo = metricsNamespace + ".info"
repoStorageBytes = metricsNamespace + ".repo.storage.bytes"
serverInfo = metricsNamespace + ".info"
schedulerNumWorkers = metricsNamespace + ".scheduler.workers.total"
schedulerWorkers = metricsNamespace + ".scheduler.workers"
schedulerGeneratorsStatus = metricsNamespace + ".scheduler.generators.status"
schedulerTasksQueue = metricsNamespace + ".scheduler.tasksqueue.length"
// Summary.
httpRepoLatencySeconds = metricsNamespace + ".http.repo.latency.seconds"
// Histogram.
httpMethodLatencySeconds = metricsNamespace + ".http.method.latency.seconds"
storageLockLatencySeconds = metricsNamespace + ".storage.lock.latency.seconds"
workersTasksDuration = metricsNamespace + ".scheduler.workers.tasks.duration.seconds"

metricsScrapeTimeout = 2 * time.Minute
metricsScrapeCheckInterval = 30 * time.Second
Expand Down Expand Up @@ -215,16 +221,21 @@ func NewMetricsServer(enabled bool, log log.Logger) MetricServer {
// contains a map with key=CounterName and value=CounterLabels.
func GetCounters() map[string][]string {
return map[string][]string{
httpConnRequests: {"method", "code"},
repoDownloads: {"repo"},
repoUploads: {"repo"},
httpConnRequests: {"method", "code"},
repoDownloads: {"repo"},
repoUploads: {"repo"},
schedulerGenerators: {},
}
}

func GetGauges() map[string][]string {
return map[string][]string{
repoStorageBytes: {"repo"},
serverInfo: {"commit", "binaryType", "goVersion", "version"},
repoStorageBytes: {"repo"},
serverInfo: {"commit", "binaryType", "goVersion", "version"},
schedulerNumWorkers: {},
schedulerGeneratorsStatus: {"priority", "state"},
schedulerTasksQueue: {"priority"},
schedulerWorkers: {"state"},
}
}

Expand All @@ -238,6 +249,7 @@ func GetHistograms() map[string][]string {
return map[string][]string{
httpMethodLatencySeconds: {"method"},
storageLockLatencySeconds: {"storageName", "lockType"},
workersTasksDuration: {"name"},
}
}

Expand Down Expand Up @@ -533,3 +545,66 @@ func GetBuckets(metricName string) []float64 {
return GetDefaultBuckets()
}
}

func SetSchedulerNumWorkers(ms MetricServer, workers int) {
numWorkers := GaugeValue{
Name: schedulerNumWorkers,
Value: float64(workers),
}
ms.ForceSendMetric(numWorkers)
}

func IncSchedulerGenerators(ms MetricServer) {
genCounter := CounterValue{
Name: schedulerGenerators,
}
ms.ForceSendMetric(genCounter)
}

func ObserveWorkersTasksDuration(ms MetricServer, taskName string, duration time.Duration) {
h := HistogramValue{
Name: workersTasksDuration,
Sum: duration.Seconds(), // convenient temporary store for Histogram latency value
LabelNames: []string{"name"},
LabelValues: []string{taskName},
}
ms.SendMetric(h)
}

func SetSchedulerGenerators(ms MetricServer, gen map[string]map[string]uint64) {
for priority, states := range gen {
for state, value := range states {
generator := GaugeValue{
Name: schedulerGeneratorsStatus,
Value: float64(value),
LabelNames: []string{"priority", "state"},
LabelValues: []string{priority, state},
}
ms.SendMetric(generator)
}
}
}

func SetSchedulerTasksQueue(ms MetricServer, tq map[string]int) {
for priority, value := range tq {
tasks := GaugeValue{
Name: schedulerTasksQueue,
Value: float64(value),
LabelNames: []string{"priority"},
LabelValues: []string{priority},
}
ms.SendMetric(tasks)
}
}

func SetSchedulerWorkers(ms MetricServer, w map[string]int) {
for state, value := range w {
workers := GaugeValue{
Name: schedulerWorkers,
Value: float64(value),
LabelNames: []string{"state"},
LabelValues: []string{state},
}
ms.SendMetric(workers)
}
}
3 changes: 2 additions & 1 deletion pkg/extensions/monitoring/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,8 @@ func TestPopulateStorageMetrics(t *testing.T) {
err = WriteImageToFileSystem(CreateDefaultImage(), "busybox", "0.0.1", srcStorageCtlr)
So(err, ShouldBeNil)

sch := scheduler.NewScheduler(conf, ctlr.Log)
metrics := monitoring.NewMetricsServer(true, ctlr.Log)
sch := scheduler.NewScheduler(conf, metrics, ctlr.Log)
ctx, cancel := context.WithCancel(context.Background())
sch.RunScheduler(ctx)

Expand Down
10 changes: 10 additions & 0 deletions pkg/extensions/scrub/scrub.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,13 @@ func NewTask(imgStore storageTypes.ImageStore, repo string, log log.Logger) *Tas
func (scrubT *Task) DoWork(ctx context.Context) error {
return RunScrubRepo(ctx, scrubT.imgStore, scrubT.repo, scrubT.log) //nolint: contextcheck
}

func (scrubT *Task) String() string {
return fmt.Sprintf("{taskGenerator: \"%s\", repo: \"%s\"}",
"image scrub", // description of generator's task purpose
scrubT.repo)
}

func (scrubT *Task) Name() string {
return "ScrubTask"
}
11 changes: 11 additions & 0 deletions pkg/extensions/search/cve/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cveinfo

import (
"context"
"fmt"
"sync"

"zotregistry.io/zot/pkg/log"
Expand Down Expand Up @@ -194,3 +195,13 @@ func (st *scanTask) DoWork(ctx context.Context) error {

return nil
}

func (st *scanTask) String() string {
return fmt.Sprintf("{scanTaskGenerator: \"%s\", repo: \"%s\", digest: \"%s\"}",
"cve scan image", // description of generator's task purpose
st.repo, st.digest)
}

func (st *scanTask) Name() string {
return "ScanTask"
}
8 changes: 5 additions & 3 deletions pkg/extensions/search/cve/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func TestScanGeneratorWithMockedData(t *testing.T) { //nolint: gocyclo

cfg := config.New()
cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3}
sch := scheduler.NewScheduler(cfg, logger)
metrics := monitoring.NewMetricsServer(true, logger)
sch := scheduler.NewScheduler(cfg, metrics, logger)

params := boltdb.DBParameters{
RootDir: t.TempDir(),
Expand Down Expand Up @@ -502,8 +503,9 @@ func TestScanGeneratorWithRealData(t *testing.T) {
metaDB, err := boltdb.New(boltDriver, logger)
So(err, ShouldBeNil)

metrics := monitoring.NewMetricsServer(true, logger)
imageStore := local.NewImageStore(rootDir, false, false,
logger, monitoring.NewMetricsServer(false, logger), nil, nil)
logger, metrics, nil, nil)
storeController := storage.StoreController{DefaultStore: imageStore}

image := CreateRandomVulnerableImage()
Expand All @@ -520,7 +522,7 @@ func TestScanGeneratorWithRealData(t *testing.T) {

So(scanner.IsResultCached(image.DigestStr()), ShouldBeFalse)

sch := scheduler.NewScheduler(cfg, logger)
sch := scheduler.NewScheduler(cfg, metrics, logger)

generator := cveinfo.NewScanTaskGenerator(metaDB, scanner, logger)

Expand Down
Loading

0 comments on commit 651cd19

Please sign in to comment.