Skip to content

Commit

Permalink
wip: rq metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Daichi Sakaue <[email protected]>
  • Loading branch information
yokaze committed Dec 5, 2023
1 parent 20eadbd commit 9b8b1bb
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 22 deletions.
2 changes: 2 additions & 0 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ CKE exposes the following metrics with the Prometheus format at `/metrics` REST
| node_reboot_status | The reboot status of a node. | Gauge | `node`, `status` |
| operation_phase | 1 if CKE is operating in the phase specified by the `phase` label. | Gauge | `phase` |
| operation_phase_timestamp_seconds | The Unix timestamp when `operation_phase` was last updated. | Gauge | |
| reboot_queue_enabled | True (=1) if reboot queue is enabled. | Gauge | |
| reboot_queue_entries | The number of reboot queue entries remaining. | Gauge | |
| reboot_queue_items | The number reboot queue entries remaining per status. | Gauge | `status` |
| reboot_queue_running | True (=1) if reboot queue is enabled and the queue is not empty. | Gauge | |
| sabakan_integration_successful | True (=1) if sabakan-integration satisfies constraints. | Gauge | |
| sabakan_integration_timestamp_seconds | The Unix timestamp when `sabakan_integration_successful` was last updated. | Gauge | |
| sabakan_workers | The number of worker nodes for each role. | Gauge | `role` |
Expand Down
29 changes: 29 additions & 0 deletions metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type metricGroup struct {
// This abstraction is for mock test.
type storage interface {
IsSabakanDisabled(context.Context) (bool, error)
IsRebootQueueDisabled(ctx context.Context) (bool, error)
GetRebootsEntries(ctx context.Context) ([]*cke.RebootQueueEntry, error)
GetCluster(ctx context.Context) (*cke.Cluster, error)
}
Expand Down Expand Up @@ -131,15 +132,25 @@ type nodeMetricsCollector struct {
var _ prometheus.Collector = &nodeMetricsCollector{}

func (c nodeMetricsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- rebootQueueEnabled
ch <- rebootQueueEntries
ch <- rebootQueueItems
ch <- rebootQueueRunning
ch <- nodeRebootStatus
}

func (c nodeMetricsCollector) Collect(ch chan<- prometheus.Metric) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

rqDisabled, err := c.storage.IsRebootQueueDisabled(ctx)
if err != nil {
log.Error("failed to get if reboot queue is enabled", map[string]interface{}{
log.FnError: err,
})
return
}

rqEntries, err := c.storage.GetRebootsEntries(ctx)
if err != nil {
log.Error("failed to get reboots entries", map[string]interface{}{
Expand All @@ -148,6 +159,14 @@ func (c nodeMetricsCollector) Collect(ch chan<- prometheus.Metric) {
return
}

var rqEnabled, rqRunning float64
if !rqDisabled {
rqEnabled = 1
}
if !rqDisabled && len(rqEntries) > 0 {
rqRunning = 1
}

cluster, err := c.storage.GetCluster(ctx)
if err != nil {
log.Error("failed to get cluster", map[string]interface{}{
Expand All @@ -158,11 +177,21 @@ func (c nodeMetricsCollector) Collect(ch chan<- prometheus.Metric) {
itemCounts := cke.CountRebootQueueEntries(rqEntries)
nodeStatus := cke.BuildNodeRebootStatus(cluster.Nodes, rqEntries)

ch <- prometheus.MustNewConstMetric(
rebootQueueEnabled,
prometheus.GaugeValue,
rqEnabled,
)
ch <- prometheus.MustNewConstMetric(
rebootQueueEntries,
prometheus.GaugeValue,
float64(len(rqEntries)),
)
ch <- prometheus.MustNewConstMetric(
rebootQueueRunning,
prometheus.GaugeValue,
rqRunning,
)
for status, count := range itemCounts {
ch <- prometheus.MustNewConstMetric(
rebootQueueItems,
Expand Down
14 changes: 14 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ var operationPhaseTimestampSeconds = prometheus.NewGauge(
},
)

var rebootQueueEnabled = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "reboot_queue_enabled"),
"1 if reboot queue is enabled.",
nil,
nil,
)

var rebootQueueEntries = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "reboot_queue_entries"),
"The number of reboot queue entries remaining.",
Expand All @@ -47,6 +54,13 @@ var rebootQueueItems = prometheus.NewDesc(
nil,
)

var rebootQueueRunning = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "reboot_queue_running"),
"1 if reboot queue is enabled and the queue is not empty.",
nil,
nil,
)

var nodeRebootStatus = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "node_reboot_status"),
"The reboot status of a node.",
Expand Down
99 changes: 77 additions & 22 deletions metrics/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ type updateOperationPhaseTestCase struct {
}

type updateRebootQueueEntriesTestCase struct {
name string
input []*cke.RebootQueueEntry
expected float64
name string
enabled bool
input []*cke.RebootQueueEntry
expectedEnabled float64
expectedRunning float64
expectedEntries float64
}

type updateRebootQueueItemsTestCase struct {
Expand Down Expand Up @@ -251,24 +254,44 @@ func testUpdateOperationPhase(t *testing.T) {
func testUpdateRebootQueueEntries(t *testing.T) {
testCases := []updateRebootQueueEntriesTestCase{
{
name: "zero",
input: nil,
expected: 0,
name: "zero",
enabled: true,
input: nil,
expectedEnabled: 1,
expectedRunning: 0,
expectedEntries: 0,
},
{
name: "one",
name: "one",
enabled: true,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
},
expected: 1,
expectedEnabled: 1,
expectedRunning: 1,
expectedEntries: 1,
},
{
name: "two",
enabled: true,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
{Status: cke.RebootStatusRebooting},
},
expectedEnabled: 1,
expectedRunning: 1,
expectedEntries: 2,
},
{
name: "two",
name: "two-disabled",
enabled: false,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
{Status: cke.RebootStatusRebooting},
},
expected: 2,
expectedEnabled: 0,
expectedRunning: 0,
expectedEntries: 2,
},
}
for _, tt := range testCases {
Expand All @@ -277,6 +300,7 @@ func testUpdateRebootQueueEntries(t *testing.T) {
defer ctx.Done()

collector, storage := newTestCollector()
storage.enableRebootQueue(tt.enabled)
storage.setRebootsEntries(tt.input)
handler := GetHandler(collector)

Expand All @@ -289,19 +313,41 @@ func testUpdateRebootQueueEntries(t *testing.T) {
t.Fatal(err)
}

metricsFound := false
metricsEnabledFound := false
metricsRunningFound := false
metricsEntriesFound := false
for _, mf := range metricsFamily {
if *mf.Name != "cke_reboot_queue_entries" {
continue
}
for _, m := range mf.Metric {
metricsFound = true
if *m.Gauge.Value != tt.expected {
t.Errorf("value for cke_reboot_queue_entries is wrong. expected: %f, actual: %f", tt.expected, *m.Gauge.Value)
switch *mf.Name {
case "cke_reboot_queue_enabled":
for _, m := range mf.Metric {
metricsEnabledFound = true
if *m.Gauge.Value != tt.expectedEnabled {
t.Errorf("value for cke_reboot_queue_enabled is wrong. expected: %f, actual: %f", tt.expectedEnabled, *m.Gauge.Value)
}
}
case "cke_reboot_queue_running":
for _, m := range mf.Metric {
metricsRunningFound = true
if *m.Gauge.Value != tt.expectedEnabled {
t.Errorf("value for cke_reboot_queue_running is wrong. expected: %f, actual: %f", tt.expectedRunning, *m.Gauge.Value)
}
}
case "cke_reboot_queue_entries":
for _, m := range mf.Metric {
metricsEntriesFound = true
if *m.Gauge.Value != tt.expectedEntries {
t.Errorf("value for cke_reboot_queue_entries is wrong. expected: %f, actual: %f", tt.expectedEntries, *m.Gauge.Value)
}
}
}
}
if !metricsFound {
if !metricsEnabledFound {
t.Errorf("metrics reboot_queue_enabled was not found")
}
if !metricsRunningFound {
t.Errorf("metrics reboot_queue_running was not found")
}
if !metricsEntriesFound {
t.Errorf("metrics reboot_queue_entries was not found")
}
})
Expand Down Expand Up @@ -623,9 +669,10 @@ func newTestCollector() (prometheus.Collector, *testStorage) {
}

type testStorage struct {
sabakanEnabled bool
rebootEntries []*cke.RebootQueueEntry
cluster *cke.Cluster
sabakanEnabled bool
rebootQueueEnabled bool
rebootEntries []*cke.RebootQueueEntry
cluster *cke.Cluster
}

func (s *testStorage) enableSabakan(flag bool) {
Expand All @@ -636,6 +683,14 @@ func (s *testStorage) IsSabakanDisabled(_ context.Context) (bool, error) {
return !s.sabakanEnabled, nil
}

func (s *testStorage) IsRebootQueueDisabled(_ context.Context) (bool, error) {
return !s.rebootQueueEnabled, nil
}

func (s *testStorage) enableRebootQueue(flag bool) error {
s.rebootQueueEnabled = flag
}

Check failure on line 692 in metrics/updater_test.go

View workflow job for this annotation

GitHub Actions / Build CKE

missing return (compile)

func (s *testStorage) setRebootsEntries(entries []*cke.RebootQueueEntry) {
s.rebootEntries = entries
}
Expand Down

0 comments on commit 9b8b1bb

Please sign in to comment.