diff --git a/docs/metrics.md b/docs/metrics.md index 9b89be72..f1b8a01d 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -9,10 +9,10 @@ CKE exposes the following metrics with the Prometheus format at `/metrics` REST | node_reboot_status | The reboot status of a node. | Gauge | `node`, `status` | | operation_phase | 1 if CKE is operating in the phase specified by the `phase` label. | Gauge | `phase` | | operation_phase_timestamp_seconds | The Unix timestamp when `operation_phase` was last updated. | Gauge | | -| reboot_queue_enabled | True (=1) if reboot queue is enabled or stopping. | Gauge | | +| reboot_queue_enabled | True (=1) if reboot queue is enabled. | Gauge | | | reboot_queue_entries | The number of reboot queue entries remaining. | Gauge | | | reboot_queue_items | The number reboot queue entries remaining per status. | Gauge | `status` | -| reboot_queue_running | True (=1) if reboot queue is enabled and the queue is not empty. | Gauge | | +| reboot_queue_running | True (=1) if reboot queue is running. | Gauge | | | sabakan_integration_successful | True (=1) if sabakan-integration satisfies constraints. | Gauge | | | sabakan_integration_timestamp_seconds | The Unix timestamp when `sabakan_integration_successful` was last updated. | Gauge | | | sabakan_workers | The number of worker nodes for each role. | Gauge | `role` | diff --git a/docs/reboot.md b/docs/reboot.md index 584cdb1a..465eb3b7 100644 --- a/docs/reboot.md +++ b/docs/reboot.md @@ -43,7 +43,7 @@ The command writes reboot queue entry(s) and increments `reboots/write-index` at The queue is processed by CKE as follows: -1. If `reboots/state` is not `enabled`, it will not process the queue. +1. If `reboots/disabled` is `true`, it doesn't process the queue. 2. Check the reboot queue to find an entry. - If the number of nodes under processing is less than maximum concurrent reboots and the number of unreachable nodes that are not under this reboot process is not more than `maximum-unreachable-nodes-for-reboot` in the constraints, pick several nodes from front of the queue and start draining them. 1. Cordon the node. diff --git a/metrics/collector.go b/metrics/collector.go index 7f3222e7..6efe25ed 100644 --- a/metrics/collector.go +++ b/metrics/collector.go @@ -39,7 +39,8 @@ type metricGroup struct { // This abstraction is for mock test. type storage interface { IsSabakanDisabled(context.Context) (bool, error) - GetRebootQueueState(ctx context.Context) (cke.RebootQueueState, error) + IsRebootQueueDisabled(ctx context.Context) (bool, error) + IsRebootQueueRunning(ctx context.Context) (bool, error) GetRebootsEntries(ctx context.Context) ([]*cke.RebootQueueEntry, error) GetCluster(ctx context.Context) (*cke.Cluster, error) } @@ -143,28 +144,36 @@ func (c nodeMetricsCollector) Collect(ch chan<- prometheus.Metric) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - rqState, err := c.storage.GetRebootQueueState(ctx) + disabled, err := c.storage.IsRebootQueueDisabled(ctx) if err != nil { log.Error("failed to get if reboot queue is enabled", map[string]interface{}{ log.FnError: err, }) return } + var rqEnabled float64 + if !disabled { + rqEnabled = 1 + } - rqEntries, err := c.storage.GetRebootsEntries(ctx) + running, err := c.storage.IsRebootQueueRunning(ctx) if err != nil { - log.Error("failed to get reboots entries", map[string]interface{}{ + log.Error("failed to get if reboot queue is running", map[string]interface{}{ log.FnError: err, }) return } + var rqRunning float64 + if running { + rqRunning = 1 + } - var rqEnabled, rqRunning float64 - if rqState == cke.RebootQueueStateEnabled || rqState == cke.RebootQueueStateStopping { - rqEnabled = 1 - if len(rqEntries) > 0 { - rqRunning = 1 - } + rqEntries, err := c.storage.GetRebootsEntries(ctx) + if err != nil { + log.Error("failed to get reboots entries", map[string]interface{}{ + log.FnError: err, + }) + return } cluster, err := c.storage.GetCluster(ctx) diff --git a/metrics/updater_test.go b/metrics/updater_test.go index 90b07fac..af3733b3 100644 --- a/metrics/updater_test.go +++ b/metrics/updater_test.go @@ -43,7 +43,8 @@ type updateOperationPhaseTestCase struct { type updateRebootQueueEntriesTestCase struct { name string - state cke.RebootQueueState + enabled bool + running bool input []*cke.RebootQueueEntry expectedEnabled float64 expectedRunning float64 @@ -255,15 +256,17 @@ func testUpdateRebootQueueEntries(t *testing.T) { testCases := []updateRebootQueueEntriesTestCase{ { name: "zero", - state: cke.RebootQueueStateEnabled, + enabled: true, + running: false, input: nil, expectedEnabled: 1, expectedRunning: 0, expectedEntries: 0, }, { - name: "one", - state: cke.RebootQueueStateEnabled, + name: "one", + enabled: true, + running: true, input: []*cke.RebootQueueEntry{ {Status: cke.RebootStatusQueued}, }, @@ -272,8 +275,9 @@ func testUpdateRebootQueueEntries(t *testing.T) { expectedEntries: 1, }, { - name: "two", - state: cke.RebootQueueStateEnabled, + name: "two", + enabled: true, + running: true, input: []*cke.RebootQueueEntry{ {Status: cke.RebootStatusQueued}, {Status: cke.RebootStatusRebooting}, @@ -283,8 +287,9 @@ func testUpdateRebootQueueEntries(t *testing.T) { expectedEntries: 2, }, { - name: "two-stopping", - state: cke.RebootQueueStateStopping, + name: "two-stopping", + enabled: false, + running: true, input: []*cke.RebootQueueEntry{ {Status: cke.RebootStatusQueued}, {Status: cke.RebootStatusRebooting}, @@ -294,8 +299,9 @@ func testUpdateRebootQueueEntries(t *testing.T) { expectedEntries: 2, }, { - name: "two-disabled", - state: cke.RebootQueueStateDisabled, + name: "two-disabled", + enabled: false, + running: false, input: []*cke.RebootQueueEntry{ {Status: cke.RebootStatusQueued}, {Status: cke.RebootStatusRebooting}, @@ -311,7 +317,8 @@ func testUpdateRebootQueueEntries(t *testing.T) { defer ctx.Done() collector, storage := newTestCollector() - storage.setRebootQueueState(tt.state) + storage.enableRebootQueue(tt.enabled) + storage.setRebootQueueRunning(tt.running) storage.setRebootsEntries(tt.input) handler := GetHandler(collector) @@ -680,10 +687,11 @@ func newTestCollector() (prometheus.Collector, *testStorage) { } type testStorage struct { - sabakanEnabled bool - rebootQueueState cke.RebootQueueState - rebootEntries []*cke.RebootQueueEntry - cluster *cke.Cluster + sabakanEnabled bool + rebootQueueEnabled bool + rebootQueueRunning bool + rebootEntries []*cke.RebootQueueEntry + cluster *cke.Cluster } func (s *testStorage) enableSabakan(flag bool) { @@ -694,12 +702,20 @@ func (s *testStorage) IsSabakanDisabled(_ context.Context) (bool, error) { return !s.sabakanEnabled, nil } -func (s *testStorage) GetRebootQueueState(_ context.Context) (cke.RebootQueueState, error) { - return s.rebootQueueState, nil +func (s *testStorage) IsRebootQueueDisabled(_ context.Context) (bool, error) { + return !s.rebootQueueEnabled, nil } -func (s *testStorage) setRebootQueueState(state cke.RebootQueueState) { - s.rebootQueueState = state +func (s *testStorage) enableRebootQueue(flag bool) { + s.rebootQueueEnabled = flag +} + +func (s *testStorage) IsRebootQueueRunning(_ context.Context) (bool, error) { + return s.rebootQueueRunning, nil +} + +func (s *testStorage) setRebootQueueRunning(flag bool) { + s.rebootQueueRunning = flag } func (s *testStorage) setRebootsEntries(entries []*cke.RebootQueueEntry) { diff --git a/pkg/ckecli/cmd/reboot_queue_disable.go b/pkg/ckecli/cmd/reboot_queue_disable.go index 0c695c99..55a6cf9e 100644 --- a/pkg/ckecli/cmd/reboot_queue_disable.go +++ b/pkg/ckecli/cmd/reboot_queue_disable.go @@ -3,7 +3,6 @@ package cmd import ( "context" - "github.com/cybozu-go/cke" "github.com/cybozu-go/well" "github.com/spf13/cobra" ) @@ -15,7 +14,7 @@ var rebootQueueDisableCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { well.Go(func(ctx context.Context) error { - return storage.SetRebootQueueState(ctx, cke.RebootQueueStateStopping) + return storage.EnableRebootQueue(ctx, false) }) well.Stop() return well.Wait() diff --git a/pkg/ckecli/cmd/reboot_queue_enable.go b/pkg/ckecli/cmd/reboot_queue_enable.go index ede7f50a..a435dc9a 100644 --- a/pkg/ckecli/cmd/reboot_queue_enable.go +++ b/pkg/ckecli/cmd/reboot_queue_enable.go @@ -3,7 +3,6 @@ package cmd import ( "context" - "github.com/cybozu-go/cke" "github.com/cybozu-go/well" "github.com/spf13/cobra" ) @@ -15,7 +14,7 @@ var rebootQueueEnableCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { well.Go(func(ctx context.Context) error { - return storage.SetRebootQueueState(ctx, cke.RebootQueueStateEnabled) + return storage.EnableRebootQueue(ctx, true) }) well.Stop() return well.Wait() diff --git a/pkg/ckecli/cmd/reboot_queue_is_enabled.go b/pkg/ckecli/cmd/reboot_queue_is_enabled.go index 3a97d313..4b4e2e10 100644 --- a/pkg/ckecli/cmd/reboot_queue_is_enabled.go +++ b/pkg/ckecli/cmd/reboot_queue_is_enabled.go @@ -15,11 +15,11 @@ var rebootQueueIsEnabledCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { well.Go(func(ctx context.Context) error { - state, err := storage.GetRebootQueueState(ctx) + disabled, err := storage.IsRebootQueueDisabled(ctx) if err != nil { return err } - fmt.Println(string(state)) + fmt.Println(!disabled) return nil }) well.Stop() diff --git a/reboot.go b/reboot.go index df3b32a6..94de3fff 100644 --- a/reboot.go +++ b/reboot.go @@ -4,19 +4,6 @@ import ( "time" ) -// RebootQueueState is state of reboot queue -type RebootQueueState string - -// RebootQueue states -const ( - // Reboot queue is enabled (should be set by ckecli) - RebootQueueStateEnabled = RebootQueueState("enabled") - // Reboot queue is requested to stop (should be set by ckecli) - RebootQueueStateStopping = RebootQueueState("stopping") - // Reboot queue is disabled (should be set by CKE) - RebootQueueStateDisabled = RebootQueueState("disabled") -) - // RebootStatus is status of reboot operation type RebootStatus string diff --git a/server/control.go b/server/control.go index 08bb4fe7..c91f6f62 100644 --- a/server/control.go +++ b/server/control.go @@ -317,21 +317,27 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t rqEntries = cke.DedupRebootQueueEntries(rqEntries) if len(rqEntries) > 0 { - rqState, err := inf.Storage().GetRebootQueueState(ctx) - switch { - case err != nil: + disabled, err := inf.Storage().IsRebootQueueDisabled(ctx) + if err != nil { return err - case rqState == cke.RebootQueueStateStopping: - err := inf.Storage().SetRebootQueueState(ctx, cke.RebootQueueStateDisabled) - if err != nil { - return err - } - rqEntries = nil - case rqState == cke.RebootQueueStateDisabled: + } + if disabled { rqEntries = nil } } + running, err := inf.Storage().IsRebootQueueRunning(ctx) + if err != nil { + return err + } + runningNext := (rqEntries != nil) + if running != runningNext { + err := inf.Storage().SetRebootQueueRunning(ctx, runningNext) + if err != nil { + return err + } + } + nf := NewNodeFilter(cluster, status) apiServers := map[string]bool{} for _, node := range nf.ControlPlane() { diff --git a/storage.go b/storage.go index 915f8702..65f714b0 100644 --- a/storage.go +++ b/storage.go @@ -30,8 +30,8 @@ const ( KeyClusterRevision = "cluster-revision" KeyConstraints = "constraints" KeyLeader = "leader/" - KeyRebootsDisabled = "reboots/disabled" // TODO: remove this key - KeyRebootsState = "reboots/state" + KeyRebootsDisabled = "reboots/disabled" + KeyRebootsRunning = "reboots/running" KeyRebootsPrefix = "reboots/data/" KeyRebootsWriteIndex = "reboots/write-index" KeyRecords = "records/" @@ -680,35 +680,54 @@ func (s Storage) GetSabakanURL(ctx context.Context) (string, error) { return s.getStringValue(ctx, KeySabakanURL) } -// GetRebootQueueState returns reboot queue state. -func (s Storage) GetRebootQueueState(ctx context.Context) (RebootQueueState, error) { - resp, err := s.Get(ctx, KeyRebootsState) +// IsRebootQueueDisabled returns true if reboot queue is disabled. +func (s Storage) IsRebootQueueDisabled(ctx context.Context) (bool, error) { + resp, err := s.Get(ctx, KeyRebootsDisabled) if err != nil { - // defaulted to on - return RebootQueueStateEnabled, err + return false, err } if resp.Count == 0 { - return RebootQueueStateEnabled, nil + return false, nil } - return RebootQueueState(resp.Kvs[0].Value), nil + return bytes.Equal([]byte("true"), resp.Kvs[0].Value), nil } -// SetRebootQueueState sets reboot queue state. -func (s Storage) SetRebootQueueState(ctx context.Context, state RebootQueueState) error { - // TODO: remove this temporary code - resp, err := s.Get(ctx, KeyRebootsDisabled) +// EnableRebootQueue enables reboot queue processing when flag is true. +// When flag is false, reboot queue is not processed. +func (s Storage) EnableRebootQueue(ctx context.Context, flag bool) error { + var val string + if flag { + val = "false" + } else { + val = "true" + } + _, err := s.Put(ctx, KeyRebootsDisabled, val) + return err +} + +// IsRebootQueueRunning returns true if CKE is processing reboot queue. +func (s Storage) IsRebootQueueRunning(ctx context.Context) (bool, error) { + resp, err := s.Get(ctx, KeyRebootsRunning) if err != nil { - return err + return false, err } - if resp.Count > 0 { - _, err := s.Delete(ctx, KeyRebootsDisabled) - if err != nil { - return err - } + if resp.Count == 0 { + return false, nil } - _, err = s.Put(ctx, KeyRebootsState, string(state)) + return bytes.Equal([]byte("true"), resp.Kvs[0].Value), nil +} + +// SetRebootQueueRunning is used to report if CKE is processing reboot queue. +func (s Storage) SetRebootQueueRunning(ctx context.Context, flag bool) error { + var val string + if flag { + val = "true" + } else { + val = "false" + } + _, err := s.Put(ctx, KeyRebootsRunning, val) return err } diff --git a/storage_test.go b/storage_test.go index 2708babd..8ab7a5f5 100644 --- a/storage_test.go +++ b/storage_test.go @@ -743,39 +743,74 @@ func testStorageReboot(t *testing.T) { } // rq is enabled by default - state, err := storage.GetRebootQueueState(ctx) + disabled, err := storage.IsRebootQueueDisabled(ctx) if err != nil { t.Fatal(err) } - if state != RebootQueueStateEnabled { + if disabled { t.Error("reboot queue should be enabled by default") } // disable rq and get its state - err = storage.SetRebootQueueState(ctx, RebootQueueStateStopping) + err = storage.EnableRebootQueue(ctx, false) if err != nil { t.Fatal(err) } - state, err = storage.GetRebootQueueState(ctx) + disabled, err = storage.IsRebootQueueDisabled(ctx) if err != nil { t.Fatal(err) } - if state != RebootQueueStateStopping { - t.Error("reboot queue state is not updated correctly") + if !disabled { + t.Error("reboot queue could not be disabled") } // re-enable rq and get its state - err = storage.SetRebootQueueState(ctx, RebootQueueStateEnabled) + err = storage.EnableRebootQueue(ctx, true) if err != nil { t.Fatal(err) } - state, err = storage.GetRebootQueueState(ctx) + disabled, err = storage.IsRebootQueueDisabled(ctx) if err != nil { t.Fatal(err) } - if state != RebootQueueStateEnabled { + if disabled { t.Error("reboot queue could not be re-enabled") } + + // rq is not running by default + running, err := storage.IsRebootQueueRunning(ctx) + if err != nil { + t.Fatal(err) + } + if running { + t.Error("reboot queue should not be running by default") + } + + // report running rq and get its state + err = storage.SetRebootQueueRunning(ctx, true) + if err != nil { + t.Fatal(err) + } + running, err = storage.IsRebootQueueRunning(ctx) + if err != nil { + t.Fatal(err) + } + if !running { + t.Error("reboot queue could not be reported running") + } + + // report not running rq and get its state + err = storage.SetRebootQueueRunning(ctx, false) + if err != nil { + t.Fatal(err) + } + running, err = storage.IsRebootQueueRunning(ctx) + if err != nil { + t.Fatal(err) + } + if running { + t.Error("reboot queue could not be reported not running") + } } func testStatus(t *testing.T) {