Skip to content

Commit

Permalink
Reflect comments
Browse files Browse the repository at this point in the history
Signed-off-by: Daichi Sakaue <[email protected]>
  • Loading branch information
yokaze committed Dec 14, 2023
1 parent 0d7f294 commit 201c8df
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 90 deletions.
4 changes: 2 additions & 2 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ CKE exposes the following metrics with the Prometheus format at `/metrics` REST
| node_reboot_status | The reboot status of a node. | Gauge | `node`, `status` |
| operation_phase | 1 if CKE is operating in the phase specified by the `phase` label. | Gauge | `phase` |
| operation_phase_timestamp_seconds | The Unix timestamp when `operation_phase` was last updated. | Gauge | |
| reboot_queue_enabled | True (=1) if reboot queue is enabled or stopping. | Gauge | |
| reboot_queue_enabled | True (=1) if reboot queue is enabled. | Gauge | |
| reboot_queue_entries | The number of reboot queue entries remaining. | Gauge | |
| reboot_queue_items | The number reboot queue entries remaining per status. | Gauge | `status` |
| reboot_queue_running | True (=1) if reboot queue is enabled and the queue is not empty. | Gauge | |
| reboot_queue_running | True (=1) if reboot queue is running. | Gauge | |
| sabakan_integration_successful | True (=1) if sabakan-integration satisfies constraints. | Gauge | |
| sabakan_integration_timestamp_seconds | The Unix timestamp when `sabakan_integration_successful` was last updated. | Gauge | |
| sabakan_workers | The number of worker nodes for each role. | Gauge | `role` |
Expand Down
2 changes: 1 addition & 1 deletion docs/reboot.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ The command writes reboot queue entry(s) and increments `reboots/write-index` at

The queue is processed by CKE as follows:

1. If `reboots/state` is not `enabled`, it will not process the queue.
1. If `reboots/disabled` is `true`, it doesn't process the queue.
2. Check the reboot queue to find an entry.
- If the number of nodes under processing is less than maximum concurrent reboots and the number of unreachable nodes that are not under this reboot process is not more than `maximum-unreachable-nodes-for-reboot` in the constraints, pick several nodes from front of the queue and start draining them.
1. Cordon the node.
Expand Down
29 changes: 19 additions & 10 deletions metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ type metricGroup struct {
// This abstraction is for mock test.
type storage interface {
IsSabakanDisabled(context.Context) (bool, error)
GetRebootQueueState(ctx context.Context) (cke.RebootQueueState, error)
IsRebootQueueDisabled(ctx context.Context) (bool, error)
IsRebootQueueRunning(ctx context.Context) (bool, error)
GetRebootsEntries(ctx context.Context) ([]*cke.RebootQueueEntry, error)
GetCluster(ctx context.Context) (*cke.Cluster, error)
}
Expand Down Expand Up @@ -143,28 +144,36 @@ func (c nodeMetricsCollector) Collect(ch chan<- prometheus.Metric) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

rqState, err := c.storage.GetRebootQueueState(ctx)
disabled, err := c.storage.IsRebootQueueDisabled(ctx)
if err != nil {
log.Error("failed to get if reboot queue is enabled", map[string]interface{}{
log.FnError: err,
})
return
}
var rqEnabled float64
if !disabled {
rqEnabled = 1
}

rqEntries, err := c.storage.GetRebootsEntries(ctx)
running, err := c.storage.IsRebootQueueRunning(ctx)
if err != nil {
log.Error("failed to get reboots entries", map[string]interface{}{
log.Error("failed to get if reboot queue is running", map[string]interface{}{
log.FnError: err,
})
return
}
var rqRunning float64
if running {
rqRunning = 1
}

var rqEnabled, rqRunning float64
if rqState == cke.RebootQueueStateEnabled || rqState == cke.RebootQueueStateStopping {
rqEnabled = 1
if len(rqEntries) > 0 {
rqRunning = 1
}
rqEntries, err := c.storage.GetRebootsEntries(ctx)
if err != nil {
log.Error("failed to get reboots entries", map[string]interface{}{
log.FnError: err,
})
return
}

cluster, err := c.storage.GetCluster(ctx)
Expand Down
54 changes: 35 additions & 19 deletions metrics/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type updateOperationPhaseTestCase struct {

type updateRebootQueueEntriesTestCase struct {
name string
state cke.RebootQueueState
enabled bool
running bool
input []*cke.RebootQueueEntry
expectedEnabled float64
expectedRunning float64
Expand Down Expand Up @@ -255,15 +256,17 @@ func testUpdateRebootQueueEntries(t *testing.T) {
testCases := []updateRebootQueueEntriesTestCase{
{
name: "zero",
state: cke.RebootQueueStateEnabled,
enabled: true,
running: false,
input: nil,
expectedEnabled: 1,
expectedRunning: 0,
expectedEntries: 0,
},
{
name: "one",
state: cke.RebootQueueStateEnabled,
name: "one",
enabled: true,
running: true,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
},
Expand All @@ -272,8 +275,9 @@ func testUpdateRebootQueueEntries(t *testing.T) {
expectedEntries: 1,
},
{
name: "two",
state: cke.RebootQueueStateEnabled,
name: "two",
enabled: true,
running: true,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
{Status: cke.RebootStatusRebooting},
Expand All @@ -283,8 +287,9 @@ func testUpdateRebootQueueEntries(t *testing.T) {
expectedEntries: 2,
},
{
name: "two-stopping",
state: cke.RebootQueueStateStopping,
name: "two-stopping",
enabled: false,
running: true,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
{Status: cke.RebootStatusRebooting},
Expand All @@ -294,8 +299,9 @@ func testUpdateRebootQueueEntries(t *testing.T) {
expectedEntries: 2,
},
{
name: "two-disabled",
state: cke.RebootQueueStateDisabled,
name: "two-disabled",
enabled: false,
running: false,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
{Status: cke.RebootStatusRebooting},
Expand All @@ -311,7 +317,8 @@ func testUpdateRebootQueueEntries(t *testing.T) {
defer ctx.Done()

collector, storage := newTestCollector()
storage.setRebootQueueState(tt.state)
storage.enableRebootQueue(tt.enabled)
storage.setRebootQueueRunning(tt.running)
storage.setRebootsEntries(tt.input)
handler := GetHandler(collector)

Expand Down Expand Up @@ -680,10 +687,11 @@ func newTestCollector() (prometheus.Collector, *testStorage) {
}

type testStorage struct {
sabakanEnabled bool
rebootQueueState cke.RebootQueueState
rebootEntries []*cke.RebootQueueEntry
cluster *cke.Cluster
sabakanEnabled bool
rebootQueueEnabled bool
rebootQueueRunning bool
rebootEntries []*cke.RebootQueueEntry
cluster *cke.Cluster
}

func (s *testStorage) enableSabakan(flag bool) {
Expand All @@ -694,12 +702,20 @@ func (s *testStorage) IsSabakanDisabled(_ context.Context) (bool, error) {
return !s.sabakanEnabled, nil
}

func (s *testStorage) GetRebootQueueState(_ context.Context) (cke.RebootQueueState, error) {
return s.rebootQueueState, nil
func (s *testStorage) IsRebootQueueDisabled(_ context.Context) (bool, error) {
return !s.rebootQueueEnabled, nil
}

func (s *testStorage) setRebootQueueState(state cke.RebootQueueState) {
s.rebootQueueState = state
func (s *testStorage) enableRebootQueue(flag bool) {
s.rebootQueueEnabled = flag
}

func (s *testStorage) IsRebootQueueRunning(_ context.Context) (bool, error) {
return s.rebootQueueRunning, nil
}

func (s *testStorage) setRebootQueueRunning(flag bool) {
s.rebootQueueRunning = flag
}

func (s *testStorage) setRebootsEntries(entries []*cke.RebootQueueEntry) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/ckecli/cmd/reboot_queue_disable.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cmd
import (
"context"

"github.com/cybozu-go/cke"
"github.com/cybozu-go/well"
"github.com/spf13/cobra"
)
Expand All @@ -15,7 +14,7 @@ var rebootQueueDisableCmd = &cobra.Command{

RunE: func(cmd *cobra.Command, args []string) error {
well.Go(func(ctx context.Context) error {
return storage.SetRebootQueueState(ctx, cke.RebootQueueStateStopping)
return storage.EnableRebootQueue(ctx, false)
})
well.Stop()
return well.Wait()
Expand Down
3 changes: 1 addition & 2 deletions pkg/ckecli/cmd/reboot_queue_enable.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cmd
import (
"context"

"github.com/cybozu-go/cke"
"github.com/cybozu-go/well"
"github.com/spf13/cobra"
)
Expand All @@ -15,7 +14,7 @@ var rebootQueueEnableCmd = &cobra.Command{

RunE: func(cmd *cobra.Command, args []string) error {
well.Go(func(ctx context.Context) error {
return storage.SetRebootQueueState(ctx, cke.RebootQueueStateEnabled)
return storage.EnableRebootQueue(ctx, true)
})
well.Stop()
return well.Wait()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ckecli/cmd/reboot_queue_is_enabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ var rebootQueueIsEnabledCmd = &cobra.Command{

RunE: func(cmd *cobra.Command, args []string) error {
well.Go(func(ctx context.Context) error {
state, err := storage.GetRebootQueueState(ctx)
disabled, err := storage.IsRebootQueueDisabled(ctx)
if err != nil {
return err
}
fmt.Println(string(state))
fmt.Println(!disabled)
return nil
})
well.Stop()
Expand Down
13 changes: 0 additions & 13 deletions reboot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,6 @@ import (
"time"
)

// RebootQueueState is state of reboot queue
type RebootQueueState string

// RebootQueue states
const (
// Reboot queue is enabled (should be set by ckecli)
RebootQueueStateEnabled = RebootQueueState("enabled")
// Reboot queue is requested to stop (should be set by ckecli)
RebootQueueStateStopping = RebootQueueState("stopping")
// Reboot queue is disabled (should be set by CKE)
RebootQueueStateDisabled = RebootQueueState("disabled")
)

// RebootStatus is status of reboot operation
type RebootStatus string

Expand Down
26 changes: 16 additions & 10 deletions server/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,21 +317,27 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t
rqEntries = cke.DedupRebootQueueEntries(rqEntries)

if len(rqEntries) > 0 {
rqState, err := inf.Storage().GetRebootQueueState(ctx)
switch {
case err != nil:
disabled, err := inf.Storage().IsRebootQueueDisabled(ctx)
if err != nil {
return err
case rqState == cke.RebootQueueStateStopping:
err := inf.Storage().SetRebootQueueState(ctx, cke.RebootQueueStateDisabled)
if err != nil {
return err
}
rqEntries = nil
case rqState == cke.RebootQueueStateDisabled:
}
if disabled {
rqEntries = nil
}
}

running, err := inf.Storage().IsRebootQueueRunning(ctx)
if err != nil {
return err
}
runningNext := len(rqEntries) > 0
if running != runningNext {
err := inf.Storage().SetRebootQueueRunning(ctx, runningNext)
if err != nil {
return err
}
}

nf := NewNodeFilter(cluster, status)
apiServers := map[string]bool{}
for _, node := range nf.ControlPlane() {
Expand Down
59 changes: 39 additions & 20 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ const (
KeyClusterRevision = "cluster-revision"
KeyConstraints = "constraints"
KeyLeader = "leader/"
KeyRebootsDisabled = "reboots/disabled" // TODO: remove this key
KeyRebootsState = "reboots/state"
KeyRebootsDisabled = "reboots/disabled"
KeyRebootsRunning = "reboots/running"
KeyRebootsPrefix = "reboots/data/"
KeyRebootsWriteIndex = "reboots/write-index"
KeyRecords = "records/"
Expand Down Expand Up @@ -680,35 +680,54 @@ func (s Storage) GetSabakanURL(ctx context.Context) (string, error) {
return s.getStringValue(ctx, KeySabakanURL)
}

// GetRebootQueueState returns reboot queue state.
func (s Storage) GetRebootQueueState(ctx context.Context) (RebootQueueState, error) {
resp, err := s.Get(ctx, KeyRebootsState)
// IsRebootQueueDisabled returns true if reboot queue is disabled.
func (s Storage) IsRebootQueueDisabled(ctx context.Context) (bool, error) {
resp, err := s.Get(ctx, KeyRebootsDisabled)
if err != nil {
// defaulted to on
return RebootQueueStateEnabled, err
return false, err
}
if resp.Count == 0 {
return RebootQueueStateEnabled, nil
return false, nil
}

return RebootQueueState(resp.Kvs[0].Value), nil
return bytes.Equal([]byte("true"), resp.Kvs[0].Value), nil
}

// SetRebootQueueState sets reboot queue state.
func (s Storage) SetRebootQueueState(ctx context.Context, state RebootQueueState) error {
// TODO: remove this temporary code
resp, err := s.Get(ctx, KeyRebootsDisabled)
// EnableRebootQueue enables reboot queue processing when flag is true.
// When flag is false, reboot queue is not processed.
func (s Storage) EnableRebootQueue(ctx context.Context, flag bool) error {
var val string
if flag {
val = "false"
} else {
val = "true"
}
_, err := s.Put(ctx, KeyRebootsDisabled, val)
return err
}

// IsRebootQueueRunning returns true if CKE is processing reboot queue.
func (s Storage) IsRebootQueueRunning(ctx context.Context) (bool, error) {
resp, err := s.Get(ctx, KeyRebootsRunning)
if err != nil {
return err
return false, err
}
if resp.Count > 0 {
_, err := s.Delete(ctx, KeyRebootsDisabled)
if err != nil {
return err
}
if resp.Count == 0 {
return false, nil
}

_, err = s.Put(ctx, KeyRebootsState, string(state))
return bytes.Equal([]byte("true"), resp.Kvs[0].Value), nil
}

// SetRebootQueueRunning is used to report if CKE is processing reboot queue.
func (s Storage) SetRebootQueueRunning(ctx context.Context, flag bool) error {
var val string
if flag {
val = "true"
} else {
val = "false"
}
_, err := s.Put(ctx, KeyRebootsRunning, val)
return err
}

Expand Down
Loading

0 comments on commit 201c8df

Please sign in to comment.