diff --git a/.gitignore b/.gitignore index d5db6b6a..da69d433 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ # Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736 .glide/ +vendor/ # Editors *~ diff --git a/cluster.go b/cluster.go index 2b4579bb..2a575165 100644 --- a/cluster.go +++ b/cluster.go @@ -277,6 +277,8 @@ type Reboot struct { CommandTimeoutSeconds *int `json:"command_timeout_seconds,omitempty"` CommandRetries *int `json:"command_retries"` CommandInterval *int `json:"command_interval"` + EvictRetries *int `json:"evict_retries"` + EvictInterval *int `json:"evict_interval"` ProtectedNamespaces *metav1.LabelSelector `json:"protected_namespaces,omitempty"` } @@ -495,6 +497,12 @@ func validateReboot(reboot Reboot) error { if reboot.CommandInterval != nil && *reboot.CommandInterval < 0 { return errors.New("command_interval must not be negative") } + if reboot.EvictRetries != nil && *reboot.EvictRetries < 0 { + return errors.New("evict_retries must not be negative") + } + if reboot.EvictInterval != nil && *reboot.EvictInterval < 0 { + return errors.New("evict_interval must not be negative") + } if reboot.MaxConcurrentReboots != nil && *reboot.MaxConcurrentReboots <= 0 { return errors.New("max_concurrent_reboots must be positive") } diff --git a/cluster_test.go b/cluster_test.go index 4eb82f98..185db19b 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -110,6 +110,18 @@ func testClusterYAML(t *testing.T) { if *c.Reboot.CommandInterval != 30 { t.Error(`*c.Reboot.CommandInterval != 30`) } + if c.Reboot.EvictRetries == nil { + t.Fatal(`c.Reboot.EvictRetries == nil`) + } + if *c.Reboot.EvictRetries != 10 { + t.Error(`*c.Reboot.EvictRetries != 10`) + } + if c.Reboot.EvictInterval == nil { + t.Fatal(`c.Reboot.EvictInterval == nil`) + } + if *c.Reboot.EvictInterval != 3 { + t.Error(`*c.Reboot.EvictInterval != 3`) + } if c.Reboot.ProtectedNamespaces == nil { t.Fatal(`c.Reboot.ProtectedNamespaces == nil`) } diff --git a/docs/cluster.md b/docs/cluster.md index c60e01a1..dc1caa63 100644 --- a/docs/cluster.md +++ b/docs/cluster.md @@ -68,13 +68,15 @@ Reboot ------ | Name | Required | Type | Description | -| -------------------------- | -------- | -------------------------------- | ----------------------------------------------------------------------- | +|----------------------------| -------- | -------------------------------- |-------------------------------------------------------------------------| | `reboot_command` | true | array | A command to reboot. List of strings. | | `boot_check_command` | true | array | A command to check nodes booted. List of strings. | | `eviction_timeout_seconds` | false | *int | Deadline for eviction. Must be positive. Default: 600 (10 minutes). | | `command_timeout_seconds` | false | *int | Deadline for rebooting. Zero means infinity. Default: wait indefinitely | | `command_retries` | false | *int | Number of reboot retries, not including initial attempt. Default: 0 | | `command_interval` | false | *int | Interval of time between reboot retries in seconds. Default: 0 | +| `evict_retries` | false | *int | Number of eviction retries, not including initial attempt. Default: 0 | +| `evict_interval` | false | *int | Interval of time between eviction retries in seconds. Default: 0 | | `max_concurrent_reboots` | false | *int | Maximum number of nodes to be rebooted concurrently. Default: 1 | | `protected_namespaces` | false | [`LabelSelector`][LabelSelector] | A label selector to protect namespaces. | @@ -89,6 +91,7 @@ If the node is not booted yet, this command should output `false` to stdout and If `command_timeout_seconds` is specified, the check command should return within `command_timeout_seconds` seconds, or it is considered failed. CKE tries to delete Pods in the `protected_namespaces` gracefully with the Kubernetes eviction API. +If the eviction API has failed, CKE retries it for `evict_retries` times with `evict_interval`-second interval. If any of the Pods cannot be deleted, it aborts the operation. The Pods in the non-protected namespaces are also tried to be deleted gracefully with the Kubernetes eviction API, but they would be simply deleted if eviction is denied. diff --git a/mtest/run_test.go b/mtest/run_test.go index efe3d372..3e9b9a01 100644 --- a/mtest/run_test.go +++ b/mtest/run_test.go @@ -341,7 +341,11 @@ func connectEtcd() (*clientv3.Client, error) { } func getClusterStatus(cluster *cke.Cluster) (*cke.ClusterStatus, []cke.ResourceDefinition, error) { - controller := server.NewController(nil, 0, time.Hour, time.Second*2, nil, 10) + controller := server.NewController(nil, nil, &server.Config{ + Interval: 0, + CertsGCInterval: time.Hour, + MaxConcurrentUpdates: 10, + }) etcd, err := connectEtcd() if err != nil { diff --git a/op/reboot.go b/op/reboot.go index 92abc3d4..2a953307 100644 --- a/op/reboot.go +++ b/op/reboot.go @@ -42,6 +42,8 @@ type rebootDrainStartCommand struct { entries []*cke.RebootQueueEntry protectedNamespaces *metav1.LabelSelector apiserver *cke.Node + evictAttempts int + evictInterval time.Duration notifyFailedNode func(string) } @@ -77,11 +79,22 @@ func (o *rebootDrainStartOp) NextCommand() cke.Commander { } o.finished = true + attempts := 1 + if o.config.EvictRetries != nil { + attempts = *o.config.EvictRetries + 1 + } + interval := 0 * time.Second + if o.config.EvictInterval != nil { + interval = time.Second * time.Duration(*o.config.EvictInterval) + } + return rebootDrainStartCommand{ entries: o.entries, protectedNamespaces: o.config.ProtectedNamespaces, apiserver: o.apiserver, notifyFailedNode: o.notifyFailedNode, + evictAttempts: attempts, + evictInterval: interval, } } @@ -156,7 +169,7 @@ func (c rebootDrainStartCommand) Run(ctx context.Context, inf cke.Infrastructure // next, evict pods on each node for _, entry := range evictNodes { - err := evictOrDeleteNodePod(ctx, cs, entry.Node, protected) + err := evictOrDeleteNodePod(ctx, cs, entry.Node, protected, c.evictAttempts, c.evictInterval) if err != nil { c.notifyFailedNode(entry.Node) err = drainBackOff(ctx, inf, entry, err) diff --git a/op/reboot_decide.go b/op/reboot_decide.go index ecb53f48..14e0bb7a 100644 --- a/op/reboot_decide.go +++ b/op/reboot_decide.go @@ -74,14 +74,21 @@ func checkJobPodNotExist(ctx context.Context, cs *kubernetes.Clientset, node str // evictOrDeleteNodePod evicts or delete Pods on the specified Node. // It first tries eviction. If the eviction failed and the Pod's namespace is not protected, it deletes the Pod. // If a running Job Pod exists, this function returns an error. -func evictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool) error { +func evictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration) error { return enumeratePods(ctx, cs, node, func(pod *corev1.Pod) error { + evictCount := 0 + EVICT: + log.Info("start evicting pod", map[string]interface{}{ + "namespace": pod.Namespace, + "name": pod.Name, + }) err := cs.CoreV1().Pods(pod.Namespace).EvictV1(ctx, &policyv1.Eviction{ ObjectMeta: metav1.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace}, }) + evictCount++ switch { case err == nil: - log.Info("start evicting pod", map[string]interface{}{ + log.Info("evicted pod", map[string]interface{}{ "namespace": pod.Namespace, "name": pod.Name, }) @@ -105,6 +112,18 @@ func evictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node st "name": pod.Name, }) default: + if evictCount < attempts { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(interval): + } + log.Info("retry eviction of pod", map[string]interface{}{ + "namespace": pod.Namespace, + "name": pod.Name, + }) + goto EVICT + } return fmt.Errorf("failed to evict pod %s/%s due to PDB: %w", pod.Namespace, pod.Name, err) } return nil diff --git a/pkg/cke/main.go b/pkg/cke/main.go index df322eb3..a572de4d 100644 --- a/pkg/cke/main.go +++ b/pkg/cke/main.go @@ -116,7 +116,11 @@ func main() { } // Controller - controller := server.NewController(session, interval, gcInterval, timeout, addon, maxConcurrentUpdates) + controller := server.NewController(session, addon, &server.Config{ + Interval: interval, + CertsGCInterval: gcInterval, + MaxConcurrentUpdates: maxConcurrentUpdates, + }) well.Go(controller.Run) // API server diff --git a/server/config.go b/server/config.go new file mode 100644 index 00000000..dab7dbc9 --- /dev/null +++ b/server/config.go @@ -0,0 +1,13 @@ +package server + +import "time" + +// Config is the configuration for cke-server. +type Config struct { + // Interval is the interval of the main loop. + Interval time.Duration + // CertsGCInterval is the interval of the certificate garbage collection. + CertsGCInterval time.Duration + // MaxConcurrentUpdates is the maximum number of concurrent updates. + MaxConcurrentUpdates int +} diff --git a/server/control.go b/server/control.go index bcd34eec..4507ea2e 100644 --- a/server/control.go +++ b/server/control.go @@ -23,17 +23,14 @@ var ( // Controller manage operations type Controller struct { - session *concurrency.Session - interval time.Duration - certsGCInterval time.Duration - timeout time.Duration - addon Integrator - maxConcurrentUpdates int + session *concurrency.Session + addon Integrator + config *Config } // NewController construct controller instance -func NewController(s *concurrency.Session, interval, gcInterval, timeout time.Duration, addon Integrator, maxConcurrentUpdates int) Controller { - return Controller{s, interval, gcInterval, timeout, addon, maxConcurrentUpdates} +func NewController(s *concurrency.Session, addon Integrator, config *Config) Controller { + return Controller{s, addon, config} } // Run execute procedures with leader elections @@ -149,7 +146,7 @@ func (c Controller) runLoop(ctx context.Context, leaderKey string) error { case <-ctx.Done(): return ctx.Err() } - ticker := time.NewTicker(c.interval) + ticker := time.NewTicker(c.config.Interval) defer ticker.Stop() for { select { @@ -170,7 +167,7 @@ func (c Controller) runLoop(ctx context.Context, leaderKey string) error { return ctx.Err() default: } - ticker := time.NewTicker(c.certsGCInterval) + ticker := time.NewTicker(c.config.CertsGCInterval) defer ticker.Stop() for { select { @@ -346,7 +343,7 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t DrainCompleted: drainCompleted, DrainTimedout: drainTimedout, RebootDequeued: rebootDequeued, - }, c.maxConcurrentUpdates) + }, c.config) st := &cke.ServerStatus{ Phase: phase, diff --git a/server/strategy.go b/server/strategy.go index 852ba474..2aeb3d74 100644 --- a/server/strategy.go +++ b/server/strategy.go @@ -24,7 +24,7 @@ type DecideOpsRebootArgs struct { // DecideOps returns the next operations to do and the operation phase. // This returns nil when no operations need to be done. -func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constraints, resources []cke.ResourceDefinition, rebootArgs DecideOpsRebootArgs, maxConcurrentUpdates int) ([]cke.Operator, cke.OperationPhase) { +func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constraints, resources []cke.ResourceDefinition, rebootArgs DecideOpsRebootArgs, config *Config) ([]cke.Operator, cke.OperationPhase) { nf := NewNodeFilter(c, cs) // 0. Execute upgrade operation if necessary @@ -40,7 +40,7 @@ func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain // 1. Run or restart rivers. This guarantees: // - CKE tools image is pulled on all nodes. // - Rivers runs on all nodes and will proxy requests only to control plane nodes. - if ops := riversOps(c, nf, maxConcurrentUpdates); len(ops) > 0 { + if ops := riversOps(c, nf, config.MaxConcurrentUpdates); len(ops) > 0 { return ops, cke.PhaseRivers } @@ -65,7 +65,7 @@ func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain } // 5. Run or restart kubernetes components. - if ops := k8sOps(c, nf, cs, maxConcurrentUpdates); len(ops) > 0 { + if ops := k8sOps(c, nf, cs, config.MaxConcurrentUpdates); len(ops) > 0 { return ops, cke.PhaseK8sStart } diff --git a/server/strategy_test.go b/server/strategy_test.go index 602d7f17..909ce59d 100644 --- a/server/strategy_test.go +++ b/server/strategy_test.go @@ -2496,7 +2496,11 @@ func TestDecideOps(t *testing.T) { for _, c := range cases { t.Run(c.Name, func(t *testing.T) { - ops, _ := DecideOps(c.Input.Cluster, c.Input.Status, c.Input.Constraints, c.Input.Resources, c.Input.RebootArgs, 5) + ops, _ := DecideOps(c.Input.Cluster, c.Input.Status, c.Input.Constraints, c.Input.Resources, c.Input.RebootArgs, &Config{ + Interval: 0, + CertsGCInterval: 0, + MaxConcurrentUpdates: 5, + }) if len(ops) == 0 && len(c.ExpectedOps) == 0 { return } diff --git a/testdata/cluster.yaml b/testdata/cluster.yaml index f404e90d..2c73026f 100644 --- a/testdata/cluster.yaml +++ b/testdata/cluster.yaml @@ -17,6 +17,8 @@ reboot: command_timeout_seconds: 120 command_retries: 3 command_interval: 30 + evict_retries: 10 + evict_interval: 3 max_concurrent_reboots: 5 protected_namespaces: matchLabels: