Skip to content

Commit

Permalink
Fix reboot_queue_running to report internal-state more precisely (#685)
Browse files Browse the repository at this point in the history
Signed-off-by: Daichi Sakaue <[email protected]>
  • Loading branch information
yokaze authored Dec 19, 2023
1 parent 7643c92 commit 1ffa827
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 11 deletions.
2 changes: 1 addition & 1 deletion docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ CKE exposes the following metrics with the Prometheus format at `/metrics` REST
| reboot_queue_enabled | True (=1) if reboot queue is enabled. | Gauge | |
| reboot_queue_entries | The number of reboot queue entries remaining. | Gauge | |
| reboot_queue_items | The number reboot queue entries remaining per status. | Gauge | `status` |
| reboot_queue_running | True (=1) if reboot queue is enabled and the queue is not empty. | Gauge | |
| reboot_queue_running | True (=1) if reboot queue is running. | Gauge | |
| sabakan_integration_successful | True (=1) if sabakan-integration satisfies constraints. | Gauge | |
| sabakan_integration_timestamp_seconds | The Unix timestamp when `sabakan_integration_successful` was last updated. | Gauge | |
| sabakan_workers | The number of worker nodes for each role. | Gauge | `role` |
Expand Down
27 changes: 18 additions & 9 deletions metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type metricGroup struct {
type storage interface {
IsSabakanDisabled(context.Context) (bool, error)
IsRebootQueueDisabled(ctx context.Context) (bool, error)
IsRebootQueueRunning(ctx context.Context) (bool, error)
GetRebootsEntries(ctx context.Context) ([]*cke.RebootQueueEntry, error)
GetCluster(ctx context.Context) (*cke.Cluster, error)
}
Expand Down Expand Up @@ -143,30 +144,38 @@ func (c nodeMetricsCollector) Collect(ch chan<- prometheus.Metric) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

rqDisabled, err := c.storage.IsRebootQueueDisabled(ctx)
disabled, err := c.storage.IsRebootQueueDisabled(ctx)
if err != nil {
log.Error("failed to get if reboot queue is enabled", map[string]interface{}{
log.FnError: err,
})
return
}
var rqEnabled float64
if !disabled {
rqEnabled = 1
}

rqEntries, err := c.storage.GetRebootsEntries(ctx)
running, err := c.storage.IsRebootQueueRunning(ctx)
if err != nil {
log.Error("failed to get reboots entries", map[string]interface{}{
log.Error("failed to get if reboot queue is running", map[string]interface{}{
log.FnError: err,
})
return
}

var rqEnabled, rqRunning float64
if !rqDisabled {
rqEnabled = 1
}
if !rqDisabled && len(rqEntries) > 0 {
var rqRunning float64
if running {
rqRunning = 1
}

rqEntries, err := c.storage.GetRebootsEntries(ctx)
if err != nil {
log.Error("failed to get reboots entries", map[string]interface{}{
log.FnError: err,
})
return
}

cluster, err := c.storage.GetCluster(ctx)
if err != nil {
log.Error("failed to get cluster", map[string]interface{}{
Expand Down
27 changes: 27 additions & 0 deletions metrics/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type updateOperationPhaseTestCase struct {
type updateRebootQueueEntriesTestCase struct {
name string
enabled bool
running bool
input []*cke.RebootQueueEntry
expectedEnabled float64
expectedRunning float64
Expand Down Expand Up @@ -256,6 +257,7 @@ func testUpdateRebootQueueEntries(t *testing.T) {
{
name: "zero",
enabled: true,
running: false,
input: nil,
expectedEnabled: 1,
expectedRunning: 0,
Expand All @@ -264,6 +266,7 @@ func testUpdateRebootQueueEntries(t *testing.T) {
{
name: "one",
enabled: true,
running: true,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
},
Expand All @@ -274,6 +277,7 @@ func testUpdateRebootQueueEntries(t *testing.T) {
{
name: "two",
enabled: true,
running: true,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
{Status: cke.RebootStatusRebooting},
Expand All @@ -282,9 +286,22 @@ func testUpdateRebootQueueEntries(t *testing.T) {
expectedRunning: 1,
expectedEntries: 2,
},
{
name: "two-stopping",
enabled: false,
running: true,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
{Status: cke.RebootStatusRebooting},
},
expectedEnabled: 0,
expectedRunning: 1,
expectedEntries: 2,
},
{
name: "two-disabled",
enabled: false,
running: false,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
{Status: cke.RebootStatusRebooting},
Expand All @@ -301,6 +318,7 @@ func testUpdateRebootQueueEntries(t *testing.T) {

collector, storage := newTestCollector()
storage.enableRebootQueue(tt.enabled)
storage.setRebootQueueRunning(tt.running)
storage.setRebootsEntries(tt.input)
handler := GetHandler(collector)

Expand Down Expand Up @@ -671,6 +689,7 @@ func newTestCollector() (prometheus.Collector, *testStorage) {
type testStorage struct {
sabakanEnabled bool
rebootQueueEnabled bool
rebootQueueRunning bool
rebootEntries []*cke.RebootQueueEntry
cluster *cke.Cluster
}
Expand All @@ -691,6 +710,14 @@ func (s *testStorage) enableRebootQueue(flag bool) {
s.rebootQueueEnabled = flag
}

func (s *testStorage) IsRebootQueueRunning(_ context.Context) (bool, error) {
return s.rebootQueueRunning, nil
}

func (s *testStorage) setRebootQueueRunning(flag bool) {
s.rebootQueueRunning = flag
}

func (s *testStorage) setRebootsEntries(entries []*cke.RebootQueueEntry) {
s.rebootEntries = entries
}
Expand Down
12 changes: 12 additions & 0 deletions server/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,18 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t
}
}

running, err := inf.Storage().IsRebootQueueRunning(ctx)
if err != nil {
return err
}
runningNext := len(rqEntries) > 0
if running != runningNext {
err := inf.Storage().SetRebootQueueRunning(ctx, runningNext)
if err != nil {
return err
}
}

nf := NewNodeFilter(cluster, status)
apiServers := map[string]bool{}
for _, node := range nf.ControlPlane() {
Expand Down
26 changes: 26 additions & 0 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
KeyConstraints = "constraints"
KeyLeader = "leader/"
KeyRebootsDisabled = "reboots/disabled"
KeyRebootsRunning = "reboots/running"
KeyRebootsPrefix = "reboots/data/"
KeyRebootsWriteIndex = "reboots/write-index"
KeyRecords = "records/"
Expand Down Expand Up @@ -705,6 +706,31 @@ func (s Storage) EnableRebootQueue(ctx context.Context, flag bool) error {
return err
}

// IsRebootQueueRunning returns true if CKE is processing reboot queue.
func (s Storage) IsRebootQueueRunning(ctx context.Context) (bool, error) {
resp, err := s.Get(ctx, KeyRebootsRunning)
if err != nil {
return false, err
}
if resp.Count == 0 {
return false, nil
}

return bytes.Equal([]byte("true"), resp.Kvs[0].Value), nil
}

// SetRebootQueueRunning is used to report if CKE is processing reboot queue.
func (s Storage) SetRebootQueueRunning(ctx context.Context, flag bool) error {
var val string
if flag {
val = "true"
} else {
val = "false"
}
_, err := s.Put(ctx, KeyRebootsRunning, val)
return err
}

func rebootsEntryKey(index int64) string {
return fmt.Sprintf("%s%016x", KeyRebootsPrefix, index)
}
Expand Down
37 changes: 36 additions & 1 deletion storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ func testStorageReboot(t *testing.T) {
t.Fatal(err)
}
if disabled {
t.Error("reboot queue should not be disabled by default")
t.Error("reboot queue should be enabled by default")
}

// disable rq and get its state
Expand Down Expand Up @@ -776,6 +776,41 @@ func testStorageReboot(t *testing.T) {
if disabled {
t.Error("reboot queue could not be re-enabled")
}

// rq is not running by default
running, err := storage.IsRebootQueueRunning(ctx)
if err != nil {
t.Fatal(err)
}
if running {
t.Error("reboot queue should not be running by default")
}

// report running rq and get its state
err = storage.SetRebootQueueRunning(ctx, true)
if err != nil {
t.Fatal(err)
}
running, err = storage.IsRebootQueueRunning(ctx)
if err != nil {
t.Fatal(err)
}
if !running {
t.Error("reboot queue could not be reported running")
}

// report not running rq and get its state
err = storage.SetRebootQueueRunning(ctx, false)
if err != nil {
t.Fatal(err)
}
running, err = storage.IsRebootQueueRunning(ctx)
if err != nil {
t.Fatal(err)
}
if running {
t.Error("reboot queue could not be reported not running")
}
}

func testStatus(t *testing.T) {
Expand Down

0 comments on commit 1ffa827

Please sign in to comment.