diff --git a/docs/metrics.md b/docs/metrics.md index 86420df4..f1b8a01d 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -12,7 +12,7 @@ CKE exposes the following metrics with the Prometheus format at `/metrics` REST | reboot_queue_enabled | True (=1) if reboot queue is enabled. | Gauge | | | reboot_queue_entries | The number of reboot queue entries remaining. | Gauge | | | reboot_queue_items | The number reboot queue entries remaining per status. | Gauge | `status` | -| reboot_queue_running | True (=1) if reboot queue is enabled and the queue is not empty. | Gauge | | +| reboot_queue_running | True (=1) if reboot queue is running. | Gauge | | | sabakan_integration_successful | True (=1) if sabakan-integration satisfies constraints. | Gauge | | | sabakan_integration_timestamp_seconds | The Unix timestamp when `sabakan_integration_successful` was last updated. | Gauge | | | sabakan_workers | The number of worker nodes for each role. | Gauge | `role` | diff --git a/metrics/collector.go b/metrics/collector.go index b594022c..6efe25ed 100644 --- a/metrics/collector.go +++ b/metrics/collector.go @@ -40,6 +40,7 @@ type metricGroup struct { type storage interface { IsSabakanDisabled(context.Context) (bool, error) IsRebootQueueDisabled(ctx context.Context) (bool, error) + IsRebootQueueRunning(ctx context.Context) (bool, error) GetRebootsEntries(ctx context.Context) ([]*cke.RebootQueueEntry, error) GetCluster(ctx context.Context) (*cke.Cluster, error) } @@ -143,30 +144,38 @@ func (c nodeMetricsCollector) Collect(ch chan<- prometheus.Metric) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - rqDisabled, err := c.storage.IsRebootQueueDisabled(ctx) + disabled, err := c.storage.IsRebootQueueDisabled(ctx) if err != nil { log.Error("failed to get if reboot queue is enabled", map[string]interface{}{ log.FnError: err, }) return } + var rqEnabled float64 + if !disabled { + rqEnabled = 1 + } - rqEntries, err := c.storage.GetRebootsEntries(ctx) + running, err := c.storage.IsRebootQueueRunning(ctx) if err != nil { - log.Error("failed to get reboots entries", map[string]interface{}{ + log.Error("failed to get if reboot queue is running", map[string]interface{}{ log.FnError: err, }) return } - - var rqEnabled, rqRunning float64 - if !rqDisabled { - rqEnabled = 1 - } - if !rqDisabled && len(rqEntries) > 0 { + var rqRunning float64 + if running { rqRunning = 1 } + rqEntries, err := c.storage.GetRebootsEntries(ctx) + if err != nil { + log.Error("failed to get reboots entries", map[string]interface{}{ + log.FnError: err, + }) + return + } + cluster, err := c.storage.GetCluster(ctx) if err != nil { log.Error("failed to get cluster", map[string]interface{}{ diff --git a/metrics/updater_test.go b/metrics/updater_test.go index 07c8c725..d9fdba04 100644 --- a/metrics/updater_test.go +++ b/metrics/updater_test.go @@ -44,6 +44,7 @@ type updateOperationPhaseTestCase struct { type updateRebootQueueEntriesTestCase struct { name string enabled bool + running bool input []*cke.RebootQueueEntry expectedEnabled float64 expectedRunning float64 @@ -256,6 +257,7 @@ func testUpdateRebootQueueEntries(t *testing.T) { { name: "zero", enabled: true, + running: false, input: nil, expectedEnabled: 1, expectedRunning: 0, @@ -264,6 +266,7 @@ func testUpdateRebootQueueEntries(t *testing.T) { { name: "one", enabled: true, + running: true, input: []*cke.RebootQueueEntry{ {Status: cke.RebootStatusQueued}, }, @@ -274,6 +277,7 @@ func testUpdateRebootQueueEntries(t *testing.T) { { name: "two", enabled: true, + running: true, input: []*cke.RebootQueueEntry{ {Status: cke.RebootStatusQueued}, {Status: cke.RebootStatusRebooting}, @@ -282,9 +286,22 @@ func testUpdateRebootQueueEntries(t *testing.T) { expectedRunning: 1, expectedEntries: 2, }, + { + name: "two-stopping", + enabled: false, + running: true, + input: []*cke.RebootQueueEntry{ + {Status: cke.RebootStatusQueued}, + {Status: cke.RebootStatusRebooting}, + }, + expectedEnabled: 0, + expectedRunning: 1, + expectedEntries: 2, + }, { name: "two-disabled", enabled: false, + running: false, input: []*cke.RebootQueueEntry{ {Status: cke.RebootStatusQueued}, {Status: cke.RebootStatusRebooting}, @@ -301,6 +318,7 @@ func testUpdateRebootQueueEntries(t *testing.T) { collector, storage := newTestCollector() storage.enableRebootQueue(tt.enabled) + storage.setRebootQueueRunning(tt.running) storage.setRebootsEntries(tt.input) handler := GetHandler(collector) @@ -671,6 +689,7 @@ func newTestCollector() (prometheus.Collector, *testStorage) { type testStorage struct { sabakanEnabled bool rebootQueueEnabled bool + rebootQueueRunning bool rebootEntries []*cke.RebootQueueEntry cluster *cke.Cluster } @@ -691,6 +710,14 @@ func (s *testStorage) enableRebootQueue(flag bool) { s.rebootQueueEnabled = flag } +func (s *testStorage) IsRebootQueueRunning(_ context.Context) (bool, error) { + return s.rebootQueueRunning, nil +} + +func (s *testStorage) setRebootQueueRunning(flag bool) { + s.rebootQueueRunning = flag +} + func (s *testStorage) setRebootsEntries(entries []*cke.RebootQueueEntry) { s.rebootEntries = entries } diff --git a/server/control.go b/server/control.go index c76e5229..0125ca1e 100644 --- a/server/control.go +++ b/server/control.go @@ -326,6 +326,18 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t } } + running, err := inf.Storage().IsRebootQueueRunning(ctx) + if err != nil { + return err + } + runningNext := len(rqEntries) > 0 + if running != runningNext { + err := inf.Storage().SetRebootQueueRunning(ctx, runningNext) + if err != nil { + return err + } + } + nf := NewNodeFilter(cluster, status) apiServers := map[string]bool{} for _, node := range nf.ControlPlane() { diff --git a/storage.go b/storage.go index 5895b8a8..65f714b0 100644 --- a/storage.go +++ b/storage.go @@ -31,6 +31,7 @@ const ( KeyConstraints = "constraints" KeyLeader = "leader/" KeyRebootsDisabled = "reboots/disabled" + KeyRebootsRunning = "reboots/running" KeyRebootsPrefix = "reboots/data/" KeyRebootsWriteIndex = "reboots/write-index" KeyRecords = "records/" @@ -705,6 +706,31 @@ func (s Storage) EnableRebootQueue(ctx context.Context, flag bool) error { return err } +// IsRebootQueueRunning returns true if CKE is processing reboot queue. +func (s Storage) IsRebootQueueRunning(ctx context.Context) (bool, error) { + resp, err := s.Get(ctx, KeyRebootsRunning) + if err != nil { + return false, err + } + if resp.Count == 0 { + return false, nil + } + + return bytes.Equal([]byte("true"), resp.Kvs[0].Value), nil +} + +// SetRebootQueueRunning is used to report if CKE is processing reboot queue. +func (s Storage) SetRebootQueueRunning(ctx context.Context, flag bool) error { + var val string + if flag { + val = "true" + } else { + val = "false" + } + _, err := s.Put(ctx, KeyRebootsRunning, val) + return err +} + func rebootsEntryKey(index int64) string { return fmt.Sprintf("%s%016x", KeyRebootsPrefix, index) } diff --git a/storage_test.go b/storage_test.go index 5daac1b0..8ab7a5f5 100644 --- a/storage_test.go +++ b/storage_test.go @@ -748,7 +748,7 @@ func testStorageReboot(t *testing.T) { t.Fatal(err) } if disabled { - t.Error("reboot queue should not be disabled by default") + t.Error("reboot queue should be enabled by default") } // disable rq and get its state @@ -776,6 +776,41 @@ func testStorageReboot(t *testing.T) { if disabled { t.Error("reboot queue could not be re-enabled") } + + // rq is not running by default + running, err := storage.IsRebootQueueRunning(ctx) + if err != nil { + t.Fatal(err) + } + if running { + t.Error("reboot queue should not be running by default") + } + + // report running rq and get its state + err = storage.SetRebootQueueRunning(ctx, true) + if err != nil { + t.Fatal(err) + } + running, err = storage.IsRebootQueueRunning(ctx) + if err != nil { + t.Fatal(err) + } + if !running { + t.Error("reboot queue could not be reported running") + } + + // report not running rq and get its state + err = storage.SetRebootQueueRunning(ctx, false) + if err != nil { + t.Fatal(err) + } + running, err = storage.IsRebootQueueRunning(ctx) + if err != nil { + t.Fatal(err) + } + if running { + t.Error("reboot queue could not be reported not running") + } } func testStatus(t *testing.T) {