Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry eviction #633

Merged
merged 4 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
.glide/
vendor/

# Editors
*~
Expand Down
8 changes: 8 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ type Reboot struct {
CommandTimeoutSeconds *int `json:"command_timeout_seconds,omitempty"`
CommandRetries *int `json:"command_retries"`
CommandInterval *int `json:"command_interval"`
EvictRetries *int `json:"evict_retries"`
EvictInterval *int `json:"evict_interval"`
ProtectedNamespaces *metav1.LabelSelector `json:"protected_namespaces,omitempty"`
}

Expand Down Expand Up @@ -495,6 +497,12 @@ func validateReboot(reboot Reboot) error {
if reboot.CommandInterval != nil && *reboot.CommandInterval < 0 {
return errors.New("command_interval must not be negative")
}
if reboot.EvictRetries != nil && *reboot.EvictRetries < 0 {
return errors.New("evict_retries must not be negative")
}
if reboot.EvictInterval != nil && *reboot.EvictInterval < 0 {
return errors.New("evict_interval must not be negative")
}
if reboot.MaxConcurrentReboots != nil && *reboot.MaxConcurrentReboots <= 0 {
return errors.New("max_concurrent_reboots must be positive")
}
Expand Down
12 changes: 12 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ func testClusterYAML(t *testing.T) {
if *c.Reboot.CommandInterval != 30 {
t.Error(`*c.Reboot.CommandInterval != 30`)
}
if c.Reboot.EvictRetries == nil {
t.Fatal(`c.Reboot.EvictRetries == nil`)
}
if *c.Reboot.EvictRetries != 10 {
t.Error(`*c.Reboot.EvictRetries != 10`)
}
if c.Reboot.EvictInterval == nil {
t.Fatal(`c.Reboot.EvictInterval == nil`)
}
if *c.Reboot.EvictInterval != 3 {
t.Error(`*c.Reboot.EvictInterval != 3`)
}
if c.Reboot.ProtectedNamespaces == nil {
t.Fatal(`c.Reboot.ProtectedNamespaces == nil`)
}
Expand Down
5 changes: 4 additions & 1 deletion docs/cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,15 @@ Reboot
------

| Name | Required | Type | Description |
| -------------------------- | -------- | -------------------------------- | ----------------------------------------------------------------------- |
|----------------------------| -------- | -------------------------------- |-------------------------------------------------------------------------|
| `reboot_command` | true | array | A command to reboot. List of strings. |
| `boot_check_command` | true | array | A command to check nodes booted. List of strings. |
| `eviction_timeout_seconds` | false | *int | Deadline for eviction. Must be positive. Default: 600 (10 minutes). |
| `command_timeout_seconds` | false | *int | Deadline for rebooting. Zero means infinity. Default: wait indefinitely |
| `command_retries` | false | *int | Number of reboot retries, not including initial attempt. Default: 0 |
| `command_interval` | false | *int | Interval of time between reboot retries in seconds. Default: 0 |
| `evict_retries` | false | *int | Number of eviction retries, not including initial attempt. Default: 0 |
| `evict_interval` | false | *int | Interval of time between eviction retries in seconds. Default: 0 |
| `max_concurrent_reboots` | false | *int | Maximum number of nodes to be rebooted concurrently. Default: 1 |
| `protected_namespaces` | false | [`LabelSelector`][LabelSelector] | A label selector to protect namespaces. |

Expand All @@ -89,6 +91,7 @@ If the node is not booted yet, this command should output `false` to stdout and
If `command_timeout_seconds` is specified, the check command should return within `command_timeout_seconds` seconds, or it is considered failed.

CKE tries to delete Pods in the `protected_namespaces` gracefully with the Kubernetes eviction API.
If the eviction API has failed, CKE retries it for `evict_retries` times with `evict_interval`-second interval.
If any of the Pods cannot be deleted, it aborts the operation.

The Pods in the non-protected namespaces are also tried to be deleted gracefully with the Kubernetes eviction API, but they would be simply deleted if eviction is denied.
Expand Down
6 changes: 5 additions & 1 deletion mtest/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,11 @@ func connectEtcd() (*clientv3.Client, error) {
}

func getClusterStatus(cluster *cke.Cluster) (*cke.ClusterStatus, []cke.ResourceDefinition, error) {
controller := server.NewController(nil, 0, time.Hour, time.Second*2, nil, 10)
controller := server.NewController(nil, nil, &server.Config{
Interval: 0,
CertsGCInterval: time.Hour,
MaxConcurrentUpdates: 10,
})

etcd, err := connectEtcd()
if err != nil {
Expand Down
15 changes: 14 additions & 1 deletion op/reboot.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type rebootDrainStartCommand struct {
entries []*cke.RebootQueueEntry
protectedNamespaces *metav1.LabelSelector
apiserver *cke.Node
evictAttempts int
evictInterval time.Duration

notifyFailedNode func(string)
}
Expand Down Expand Up @@ -77,11 +79,22 @@ func (o *rebootDrainStartOp) NextCommand() cke.Commander {
}
o.finished = true

attempts := 1
if o.config.EvictRetries != nil {
attempts = *o.config.EvictRetries + 1
}
interval := 0 * time.Second
if o.config.EvictInterval != nil {
interval = time.Second * time.Duration(*o.config.EvictInterval)
}

return rebootDrainStartCommand{
entries: o.entries,
protectedNamespaces: o.config.ProtectedNamespaces,
apiserver: o.apiserver,
notifyFailedNode: o.notifyFailedNode,
evictAttempts: attempts,
evictInterval: interval,
}
}

Expand Down Expand Up @@ -156,7 +169,7 @@ func (c rebootDrainStartCommand) Run(ctx context.Context, inf cke.Infrastructure

// next, evict pods on each node
for _, entry := range evictNodes {
err := evictOrDeleteNodePod(ctx, cs, entry.Node, protected)
err := evictOrDeleteNodePod(ctx, cs, entry.Node, protected, c.evictAttempts, c.evictInterval)
if err != nil {
c.notifyFailedNode(entry.Node)
err = drainBackOff(ctx, inf, entry, err)
Expand Down
23 changes: 21 additions & 2 deletions op/reboot_decide.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,21 @@ func checkJobPodNotExist(ctx context.Context, cs *kubernetes.Clientset, node str
// evictOrDeleteNodePod evicts or delete Pods on the specified Node.
// It first tries eviction. If the eviction failed and the Pod's namespace is not protected, it deletes the Pod.
// If a running Job Pod exists, this function returns an error.
func evictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool) error {
func evictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration) error {
return enumeratePods(ctx, cs, node, func(pod *corev1.Pod) error {
evictCount := 0
EVICT:
log.Info("start evicting pod", map[string]interface{}{
"namespace": pod.Namespace,
"name": pod.Name,
})
err := cs.CoreV1().Pods(pod.Namespace).EvictV1(ctx, &policyv1.Eviction{
ObjectMeta: metav1.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace},
})
evictCount++
switch {
case err == nil:
log.Info("start evicting pod", map[string]interface{}{
log.Info("evicted pod", map[string]interface{}{
"namespace": pod.Namespace,
"name": pod.Name,
})
Expand All @@ -105,6 +112,18 @@ func evictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node st
"name": pod.Name,
})
default:
if evictCount < attempts {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(interval):
}
log.Info("retry eviction of pod", map[string]interface{}{
"namespace": pod.Namespace,
"name": pod.Name,
})
goto EVICT
}
return fmt.Errorf("failed to evict pod %s/%s due to PDB: %w", pod.Namespace, pod.Name, err)
}
return nil
Expand Down
6 changes: 5 additions & 1 deletion pkg/cke/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ func main() {
}

// Controller
controller := server.NewController(session, interval, gcInterval, timeout, addon, maxConcurrentUpdates)
controller := server.NewController(session, addon, &server.Config{
Interval: interval,
CertsGCInterval: gcInterval,
MaxConcurrentUpdates: maxConcurrentUpdates,
})
well.Go(controller.Run)

// API server
Expand Down
13 changes: 13 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package server

import "time"

// Config is the configuration for cke-server.
type Config struct {
// Interval is the interval of the main loop.
Interval time.Duration
// CertsGCInterval is the interval of the certificate garbage collection.
CertsGCInterval time.Duration
// MaxConcurrentUpdates is the maximum number of concurrent updates.
MaxConcurrentUpdates int
}
19 changes: 8 additions & 11 deletions server/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,14 @@ var (

// Controller manage operations
type Controller struct {
session *concurrency.Session
interval time.Duration
certsGCInterval time.Duration
timeout time.Duration
addon Integrator
maxConcurrentUpdates int
session *concurrency.Session
addon Integrator
config *Config
}

// NewController construct controller instance
func NewController(s *concurrency.Session, interval, gcInterval, timeout time.Duration, addon Integrator, maxConcurrentUpdates int) Controller {
return Controller{s, interval, gcInterval, timeout, addon, maxConcurrentUpdates}
func NewController(s *concurrency.Session, addon Integrator, config *Config) Controller {
return Controller{s, addon, config}
}

// Run execute procedures with leader elections
Expand Down Expand Up @@ -149,7 +146,7 @@ func (c Controller) runLoop(ctx context.Context, leaderKey string) error {
case <-ctx.Done():
return ctx.Err()
}
ticker := time.NewTicker(c.interval)
ticker := time.NewTicker(c.config.Interval)
defer ticker.Stop()
for {
select {
Expand All @@ -170,7 +167,7 @@ func (c Controller) runLoop(ctx context.Context, leaderKey string) error {
return ctx.Err()
default:
}
ticker := time.NewTicker(c.certsGCInterval)
ticker := time.NewTicker(c.config.CertsGCInterval)
defer ticker.Stop()
for {
select {
Expand Down Expand Up @@ -346,7 +343,7 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t
DrainCompleted: drainCompleted,
DrainTimedout: drainTimedout,
RebootDequeued: rebootDequeued,
}, c.maxConcurrentUpdates)
}, c.config)

st := &cke.ServerStatus{
Phase: phase,
Expand Down
6 changes: 3 additions & 3 deletions server/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type DecideOpsRebootArgs struct {

// DecideOps returns the next operations to do and the operation phase.
// This returns nil when no operations need to be done.
func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constraints, resources []cke.ResourceDefinition, rebootArgs DecideOpsRebootArgs, maxConcurrentUpdates int) ([]cke.Operator, cke.OperationPhase) {
func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constraints, resources []cke.ResourceDefinition, rebootArgs DecideOpsRebootArgs, config *Config) ([]cke.Operator, cke.OperationPhase) {
nf := NewNodeFilter(c, cs)

// 0. Execute upgrade operation if necessary
Expand All @@ -40,7 +40,7 @@ func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain
// 1. Run or restart rivers. This guarantees:
// - CKE tools image is pulled on all nodes.
// - Rivers runs on all nodes and will proxy requests only to control plane nodes.
if ops := riversOps(c, nf, maxConcurrentUpdates); len(ops) > 0 {
if ops := riversOps(c, nf, config.MaxConcurrentUpdates); len(ops) > 0 {
return ops, cke.PhaseRivers
}

Expand All @@ -65,7 +65,7 @@ func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain
}

// 5. Run or restart kubernetes components.
if ops := k8sOps(c, nf, cs, maxConcurrentUpdates); len(ops) > 0 {
if ops := k8sOps(c, nf, cs, config.MaxConcurrentUpdates); len(ops) > 0 {
return ops, cke.PhaseK8sStart
}

Expand Down
6 changes: 5 additions & 1 deletion server/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2496,7 +2496,11 @@ func TestDecideOps(t *testing.T) {

for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
ops, _ := DecideOps(c.Input.Cluster, c.Input.Status, c.Input.Constraints, c.Input.Resources, c.Input.RebootArgs, 5)
ops, _ := DecideOps(c.Input.Cluster, c.Input.Status, c.Input.Constraints, c.Input.Resources, c.Input.RebootArgs, &Config{
Interval: 0,
CertsGCInterval: 0,
MaxConcurrentUpdates: 5,
})
if len(ops) == 0 && len(c.ExpectedOps) == 0 {
return
}
Expand Down
2 changes: 2 additions & 0 deletions testdata/cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ reboot:
command_timeout_seconds: 120
command_retries: 3
command_interval: 30
evict_retries: 10
evict_interval: 3
max_concurrent_reboots: 5
protected_namespaces:
matchLabels:
Expand Down