From 86586ba05fd90d7e0281a776758f4a7c4c51cde4 Mon Sep 17 00:00:00 2001 From: Abhinav Pandey Date: Wed, 21 Feb 2024 12:07:10 -0800 Subject: [PATCH] Refactor in-place upgrade cleanup logic (#7656) --- controllers/kubeadmcontrolplane_controller.go | 68 ++++------ .../kubeadmcontrolplane_controller_test.go | 123 ++++++++--------- controllers/machinedeployment_controller.go | 83 ++++++------ .../machinedeployment_controller_test.go | 126 ++++++++---------- 4 files changed, 169 insertions(+), 231 deletions(-) diff --git a/controllers/kubeadmcontrolplane_controller.go b/controllers/kubeadmcontrolplane_controller.go index 9ec5d2daecfa..14602e278ed2 100644 --- a/controllers/kubeadmcontrolplane_controller.go +++ b/controllers/kubeadmcontrolplane_controller.go @@ -116,9 +116,6 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, log logr. return ctrl.Result{}, err } - cpUpgrade := &anywherev1.ControlPlaneUpgrade{} - cpuGetErr := r.client.Get(ctx, GetNamespacedNameType(cpUpgradeName(kcp.Name), constants.EksaSystemNamespace), cpUpgrade) - mhc := &clusterv1.MachineHealthCheck{} if err := r.client.Get(ctx, GetNamespacedNameType(cpMachineHealthCheckName(kcp.Name), constants.EksaSystemNamespace), mhc); err != nil { if apierrors.IsNotFound(err) { @@ -131,8 +128,10 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, log logr. return ctrl.Result{}, err } - if kcp.Spec.Replicas != nil && (*kcp.Spec.Replicas == kcp.Status.UpdatedReplicas) { - if cpuGetErr == nil && cpUpgrade.Status.Ready { + cpUpgrade := &anywherev1.ControlPlaneUpgrade{} + cpuGetErr := r.client.Get(ctx, GetNamespacedNameType(cpUpgradeName(kcp.Name), constants.EksaSystemNamespace), cpUpgrade) + if cpuGetErr == nil { + if cpUpgrade.Status.Ready && kcp.Status.Version != nil && *kcp.Status.Version == cpUpgrade.Spec.KubernetesVersion { log.Info("Control plane upgrade complete, deleting object", "ControlPlaneUpgrade", cpUpgrade.Name) if err := r.client.Delete(ctx, cpUpgrade); err != nil { return ctrl.Result{}, fmt.Errorf("deleting ControlPlaneUpgrade object: %v", err) @@ -141,55 +140,38 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, log logr. if err := resumeMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil { return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err) } - } else if !apierrors.IsNotFound(cpuGetErr) { - return ctrl.Result{}, fmt.Errorf("getting ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, cpuGetErr) - } - log.Info("KubeadmControlPlane is ready, removing the \"in-place-upgrade-needed\" annotation") - // Remove the in-place-upgrade-needed annotation only after the ControlPlaneUpgrade object is deleted - delete(kcp.Annotations, kcpInPlaceUpgradeNeededAnnotation) + log.Info("KubeadmControlPlane is ready, removing the \"in-place-upgrade-needed\" annotation") + // Remove the in-place-upgrade-needed annotation only after the ControlPlaneUpgrade object is deleted + delete(kcp.Annotations, kcpInPlaceUpgradeNeededAnnotation) + } return ctrl.Result{}, nil } - if cpuGetErr != nil { - if apierrors.IsNotFound(cpuGetErr) { - log.Info("Creating ControlPlaneUpgrade object") - machines, err := r.machinesToUpgrade(ctx, kcp) - if err != nil { - return ctrl.Result{}, fmt.Errorf("retrieving list of control plane machines: %v", err) - } - cpUpgrade, err := controlPlaneUpgrade(kcp, machines) - if err != nil { - return ctrl.Result{}, fmt.Errorf("generating ControlPlaneUpgrade: %v", err) - } + if apierrors.IsNotFound(cpuGetErr) { + log.Info("Creating ControlPlaneUpgrade object") + machines, err := r.machinesToUpgrade(ctx, kcp) + if err != nil { + return ctrl.Result{}, fmt.Errorf("retrieving list of control plane machines: %v", err) + } + cpUpgrade, err := controlPlaneUpgrade(kcp, machines) + if err != nil { + return ctrl.Result{}, fmt.Errorf("generating ControlPlaneUpgrade: %v", err) + } - log.Info("Pausing control plane machine health check", "MachineHealthCheck", cpMachineHealthCheckName(kcp.Name)) - if err := pauseMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil { - return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err) - } + log.Info("Pausing control plane machine health check", "MachineHealthCheck", cpMachineHealthCheckName(kcp.Name)) + if err := pauseMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil { + return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err) + } - if err := r.client.Create(ctx, cpUpgrade); client.IgnoreAlreadyExists(err) != nil { - return ctrl.Result{}, fmt.Errorf("failed to create ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, err) - } - return ctrl.Result{}, nil + if err := r.client.Create(ctx, cpUpgrade); client.IgnoreAlreadyExists(err) != nil { + return ctrl.Result{}, fmt.Errorf("failed to create ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, err) } - return ctrl.Result{}, fmt.Errorf("getting ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, cpuGetErr) - } - if !cpUpgrade.Status.Ready { return ctrl.Result{}, nil } - log.Info("Control plane upgrade complete, deleting object", "ControlPlaneUpgrade", cpUpgrade.Name) - if err := r.client.Delete(ctx, cpUpgrade); err != nil { - return ctrl.Result{}, fmt.Errorf("deleting ControlPlaneUpgrade object: %v", err) - } - - log.Info("Resuming control plane machine health check", "MachineHealthCheck", cpMachineHealthCheckName(kcp.Name)) - if err := resumeMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil { - return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err) - } + return ctrl.Result{}, fmt.Errorf("getting ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, err) - return ctrl.Result{}, nil } func (r *KubeadmControlPlaneReconciler) inPlaceUpgradeNeeded(kcp *controlplanev1.KubeadmControlPlane) bool { diff --git a/controllers/kubeadmcontrolplane_controller_test.go b/controllers/kubeadmcontrolplane_controller_test.go index a7f42b56aa43..7670b20fb34b 100644 --- a/controllers/kubeadmcontrolplane_controller_test.go +++ b/controllers/kubeadmcontrolplane_controller_test.go @@ -9,6 +9,7 @@ import ( "time" . "github.com/onsi/gomega" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -24,6 +25,11 @@ import ( "github.com/aws/eks-anywhere/pkg/constants" ) +const ( + kcpInPlaceAnnotation = "controlplane.clusters.x-k8s.io/in-place-upgrade-needed" + capiPausedAnnotation = "cluster.x-k8s.io/paused" +) + type kcpObjects struct { machines []*clusterv1.Machine cpUpgrade *anywherev1.ControlPlaneUpgrade @@ -39,31 +45,12 @@ func TestKCPSetupWithManager(t *testing.T) { g.Expect(r.SetupWithManager(env.Manager())).To(Succeed()) } -func TestKCPReconcile(t *testing.T) { - g := NewWithT(t) - ctx := context.Background() - kcpObjs := getObjectsForKCP() - - runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc} - client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build() - r := controllers.NewKubeadmControlPlaneReconciler(client) - req := kcpRequest(kcpObjs.kcp) - _, err := r.Reconcile(ctx, req) - g.Expect(err).ToNot(HaveOccurred()) - - cpu := &anywherev1.ControlPlaneUpgrade{} - err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.cpUpgrade.Name, Namespace: constants.EksaSystemNamespace}, cpu) - g.Expect(err).ToNot(HaveOccurred()) -} - -func TestKCPReconcileComplete(t *testing.T) { +func TestKCPReconcileNotNeeded(t *testing.T) { g := NewWithT(t) ctx := context.Background() kcpObjs := getObjectsForKCP() - count := int32(len(kcpObjs.machines)) - kcpObjs.kcp.Spec.Replicas = pointer.Int32(count) - kcpObjs.kcp.Status.UpdatedReplicas = count + delete(kcpObjs.kcp.Annotations, kcpInPlaceAnnotation) runtimeObjs := []runtime.Object{kcpObjs.kcp, kcpObjs.mhc} client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build() @@ -72,42 +59,27 @@ func TestKCPReconcileComplete(t *testing.T) { _, err := r.Reconcile(ctx, req) g.Expect(err).ToNot(HaveOccurred()) - kcp := &controlplanev1.KubeadmControlPlane{} - err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.kcp.Name, Namespace: constants.EksaSystemNamespace}, kcp) - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(kcp.Annotations).ToNot(HaveKey("controlplane.clusters.x-k8s.io/in-place-upgrade-needed")) - mhc := &clusterv1.MachineHealthCheck{} err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc) g.Expect(err).ToNot(HaveOccurred()) - g.Eventually(func(g Gomega) error { - func(g Gomega) { - g.Expect(mhc.Annotations).To(HaveKey("cluster.x-k8s.io/paused")) - }(g) - - return nil - }) - g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused")) + g.Expect(mhc.Annotations).ToNot(HaveKey(capiPausedAnnotation)) } -func TestKCPReconcileNotNeeded(t *testing.T) { +func TestKCPReconcile(t *testing.T) { g := NewWithT(t) ctx := context.Background() kcpObjs := getObjectsForKCP() - delete(kcpObjs.kcp.Annotations, "controlplane.clusters.x-k8s.io/in-place-upgrade-needed") - - runtimeObjs := []runtime.Object{kcpObjs.kcp, kcpObjs.mhc} + runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc} client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build() r := controllers.NewKubeadmControlPlaneReconciler(client) req := kcpRequest(kcpObjs.kcp) _, err := r.Reconcile(ctx, req) g.Expect(err).ToNot(HaveOccurred()) - mhc := &clusterv1.MachineHealthCheck{} - err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc) + cpu := &anywherev1.ControlPlaneUpgrade{} + err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.cpUpgrade.Name, Namespace: constants.EksaSystemNamespace}, cpu) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused")) } func TestKCPReconcileCreateControlPlaneUpgrade(t *testing.T) { @@ -136,14 +108,15 @@ func TestKCPReconcileCreateControlPlaneUpgrade(t *testing.T) { mhc := &clusterv1.MachineHealthCheck{} err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(mhc.Annotations).To(HaveKey("cluster.x-k8s.io/paused")) + g.Expect(mhc.Annotations).To(HaveKey(capiPausedAnnotation)) } -func TestKCPReconcileControlPlaneUpgradeReady(t *testing.T) { +func TestKCPReconcileKCPAndControlPlaneUpgradeReady(t *testing.T) { g := NewWithT(t) ctx := context.Background() kcpObjs := getObjectsForKCP() + kcpObjs.kcp.Status.Version = &kcpObjs.kcp.Spec.Version kcpObjs.cpUpgrade.Status.Ready = true runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc} @@ -156,68 +129,76 @@ func TestKCPReconcileControlPlaneUpgradeReady(t *testing.T) { cpu := &anywherev1.ControlPlaneUpgrade{} err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.cpUpgrade.Name, Namespace: constants.EksaSystemNamespace}, cpu) g.Expect(err).To(HaveOccurred()) + g.Expect(err).To(MatchError("controlplaneupgrades.anywhere.eks.amazonaws.com \"my-cluster-cp-upgrade\" not found")) + + kcp := &controlplanev1.KubeadmControlPlane{} + err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.kcp.Name, Namespace: constants.EksaSystemNamespace}, kcp) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(kcp.Annotations).ToNot(HaveKey(kcpInPlaceAnnotation)) mhc := &clusterv1.MachineHealthCheck{} err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused")) + g.Expect(mhc.Annotations).ToNot(HaveKey(capiPausedAnnotation)) } -func TestKCPReconcileKCPAndControlPlaneUpgradeReady(t *testing.T) { +func TestKCPReconcileFullFlow(t *testing.T) { g := NewWithT(t) ctx := context.Background() kcpObjs := getObjectsForKCP() - kcpObjs.kcp.Status.UpdatedReplicas = *kcpObjs.kcp.Spec.Replicas - kcpObjs.cpUpgrade.Status.Ready = true - - runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc} + runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp, kcpObjs.mhc} client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build() r := controllers.NewKubeadmControlPlaneReconciler(client) req := kcpRequest(kcpObjs.kcp) _, err := r.Reconcile(ctx, req) g.Expect(err).ToNot(HaveOccurred()) + // Expect ControlPlaneUpgrade object to be created and not ready cpu := &anywherev1.ControlPlaneUpgrade{} err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.cpUpgrade.Name, Namespace: constants.EksaSystemNamespace}, cpu) - g.Expect(err).To(HaveOccurred()) - g.Expect(err).To(MatchError("controlplaneupgrades.anywhere.eks.amazonaws.com \"my-cluster-cp-upgrade\" not found")) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(cpu.Status.Ready).To(BeFalse()) + // Expect KCP to still have in-place annotation kcp := &controlplanev1.KubeadmControlPlane{} err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.kcp.Name, Namespace: constants.EksaSystemNamespace}, kcp) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(kcp.Annotations).ToNot(HaveKey("controlplane.clusters.x-k8s.io/in-place-upgrade-needed")) + g.Expect(kcp.Annotations).To(HaveKey(kcpInPlaceAnnotation)) + // Expect MHC for KCP to be paused mhc := &clusterv1.MachineHealthCheck{} err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused")) -} + g.Expect(mhc.Annotations).To(HaveKey(capiPausedAnnotation)) -func TestKCPReconcileKCPReadyAndCPUpgradeAlreadyDeleted(t *testing.T) { - g := NewWithT(t) - ctx := context.Background() - kcpObjs := getObjectsForKCP() - - kcpObjs.kcp.Status.UpdatedReplicas = *kcpObjs.kcp.Spec.Replicas + // Mark ControlPlaneUpgrade as ready and update KCP status K8s version + cpu.Status.Ready = true + err = client.Update(ctx, cpu) + g.Expect(err).ToNot(HaveOccurred()) + kcp.Status.Version = &kcp.Spec.Version + err = client.Update(ctx, kcp) + g.Expect(err).ToNot(HaveOccurred()) - runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp, kcpObjs.mhc} - client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build() - r := controllers.NewKubeadmControlPlaneReconciler(client) - req := kcpRequest(kcpObjs.kcp) - _, err := r.Reconcile(ctx, req) + // trigger another reconcile loop + req = kcpRequest(kcp) + _, err = r.Reconcile(ctx, req) g.Expect(err).ToNot(HaveOccurred()) - // verify the in-place-upgrade-needed annotation is removed even when the ControlPlaneUpgrade object is not found - kcp := &controlplanev1.KubeadmControlPlane{} + // Expect ControlPlaneUpgrade object to be deleted + err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.cpUpgrade.Name, Namespace: constants.EksaSystemNamespace}, cpu) + g.Expect(err).To(HaveOccurred()) + g.Expect(apierrors.IsNotFound(err)).To(BeTrue()) + + // Expect KCP to no longer have in-place annotation err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.kcp.Name, Namespace: constants.EksaSystemNamespace}, kcp) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(kcp.Annotations).ToNot(HaveKey("controlplane.clusters.x-k8s.io/in-place-upgrade-needed")) + g.Expect(kcp.Annotations).ToNot(HaveKey(kcpInPlaceAnnotation)) - mhc := &clusterv1.MachineHealthCheck{} + // Expect MHC for KCP to not be paused err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused")) + g.Expect(mhc.Annotations).ToNot(HaveKey(capiPausedAnnotation)) } func TestKCPReconcileNotFound(t *testing.T) { @@ -332,7 +313,7 @@ func generateKCP(name string) *controlplanev1.KubeadmControlPlane { Namespace: constants.EksaSystemNamespace, UID: "test-uid", Annotations: map[string]string{ - "controlplane.clusters.x-k8s.io/in-place-upgrade-needed": "true", + kcpInPlaceAnnotation: "true", }, }, Spec: controlplanev1.KubeadmControlPlaneSpec{ diff --git a/controllers/machinedeployment_controller.go b/controllers/machinedeployment_controller.go index 3b12b45cc48d..e07b04b80dbf 100644 --- a/controllers/machinedeployment_controller.go +++ b/controllers/machinedeployment_controller.go @@ -114,9 +114,6 @@ func (r *MachineDeploymentReconciler) reconcile(ctx context.Context, log logr.Lo return ctrl.Result{}, fmt.Errorf("unable to retrieve kubernetes version from MachineDeployment \"%s\"", md.Name) } - mdUpgrade := &anywherev1.MachineDeploymentUpgrade{} - mduGetErr := r.client.Get(ctx, GetNamespacedNameType(mdUpgradeName(md.Name), constants.EksaSystemNamespace), mdUpgrade) - mhc := &clusterv1.MachineHealthCheck{} if err := r.client.Get(ctx, GetNamespacedNameType(mdMachineHealthCheckName(md.Name), constants.EksaSystemNamespace), mhc); err != nil { if apierrors.IsNotFound(err) { @@ -129,8 +126,23 @@ func (r *MachineDeploymentReconciler) reconcile(ctx context.Context, log logr.Lo return ctrl.Result{}, err } - if md.Spec.Replicas != nil && (*md.Spec.Replicas == md.Status.UpdatedReplicas) { - if mduGetErr == nil && mdUpgrade.Status.Ready { + machineList, machineRefList, err := r.machinesToUpgrade(ctx, md) + if err != nil { + return ctrl.Result{}, fmt.Errorf("retrieving list of control plane machines: %v", err) + } + + mdUpgrade := &anywherev1.MachineDeploymentUpgrade{} + mduGetErr := r.client.Get(ctx, GetNamespacedNameType(mdUpgradeName(md.Name), constants.EksaSystemNamespace), mdUpgrade) + if mduGetErr == nil { + machinesUpgraded := true + for i := range machineList { + m := machineList[i] + if m.Spec.Version == nil || *m.Spec.Version != mdUpgrade.Spec.KubernetesVersion { + machinesUpgraded = false + break + } + } + if machinesUpgraded && mdUpgrade.Status.Ready { log.Info("Machine deployment upgrade complete, deleting object", "MachineDeploymentUpgrade", mdUpgrade.Name) if err := r.client.Delete(ctx, mdUpgrade); err != nil { return ctrl.Result{}, fmt.Errorf("deleting MachineDeploymentUpgrade object: %v", err) @@ -139,67 +151,48 @@ func (r *MachineDeploymentReconciler) reconcile(ctx context.Context, log logr.Lo if err := resumeMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil { return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err) } - } else if !apierrors.IsNotFound(mduGetErr) { - return ctrl.Result{}, fmt.Errorf("getting MachineDeploymentUpgrade for MachineDeployment %s: %v", md.Name, mduGetErr) + log.Info("MachineDeployment is ready, removing the \"in-place-upgrade-needed\" annotation") + // Remove the in-place-upgrade-needed annotation only after the MachineDeploymentUpgrade object is deleted + delete(md.Annotations, mdInPlaceUpgradeNeededAnnotation) + return ctrl.Result{}, nil } - log.Info("MachineDeployment is ready, removing the \"in-place-upgrade-needed\" annotation") - // Remove the in-place-upgrade-needed annotation only after the MachineDeploymentUpgrade object is deleted - delete(md.Annotations, mdInPlaceUpgradeNeededAnnotation) return ctrl.Result{}, nil } - if mduGetErr != nil { - if apierrors.IsNotFound(mduGetErr) { - log.Info("Creating MachineDeploymentUpgrade object") - machines, err := r.machinesToUpgrade(ctx, md) - if err != nil { - return ctrl.Result{}, fmt.Errorf("retrieving list of control plane machines: %v", err) - } - mdUpgrade, err := machineDeploymentUpgrade(md, machines) - if err != nil { - return ctrl.Result{}, fmt.Errorf("generating MachineDeploymentUpgrade: %v", err) - } + if apierrors.IsNotFound(mduGetErr) { + log.Info("Creating MachineDeploymentUpgrade object") + mdUpgrade, err := machineDeploymentUpgrade(md, machineRefList) + if err != nil { + return ctrl.Result{}, fmt.Errorf("generating MachineDeploymentUpgrade: %v", err) + } - log.Info("Pausing machine deployment machine health check", "MachineHealthCheck", mdMachineHealthCheckName(md.Name)) - if err := pauseMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil { - return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err) - } + log.Info("Pausing machine deployment machine health check", "MachineHealthCheck", mdMachineHealthCheckName(md.Name)) + if err := pauseMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil { + return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err) + } - if err := r.client.Create(ctx, mdUpgrade); client.IgnoreAlreadyExists(err) != nil { - return ctrl.Result{}, fmt.Errorf("failed to create MachineDeploymentUpgrade for MachineDeployment %s: %v", md.Name, err) - } - return ctrl.Result{}, nil + if err := r.client.Create(ctx, mdUpgrade); client.IgnoreAlreadyExists(err) != nil { + return ctrl.Result{}, fmt.Errorf("failed to create MachineDeploymentUpgrade for MachineDeployment %s: %v", md.Name, err) } - return ctrl.Result{}, fmt.Errorf("getting MachineDeploymentUpgrade for MachineDeployment %s: %v", md.Name, mduGetErr) - } - if !mdUpgrade.Status.Ready { return ctrl.Result{}, nil } - log.Info("Machine deployment upgrade complete, deleting object", "MachineDeploymentUpgrade", mdUpgrade.Name) - if err := r.client.Delete(ctx, mdUpgrade); err != nil { - return ctrl.Result{}, fmt.Errorf("deleting MachineDeploymentUpgrade object: %v", err) - } - log.Info("Resuming machine deployment machine health check", "MachineHealthCheck", mdMachineHealthCheckName(md.Name)) - if err := resumeMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil { - return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err) - } + return ctrl.Result{}, fmt.Errorf("getting MachineDeploymentUpgrade for MachineDeployment %s: %v", md.Name, err) - return ctrl.Result{}, nil } func (r *MachineDeploymentReconciler) inPlaceUpgradeNeeded(md *clusterv1.MachineDeployment) bool { return strings.ToLower(md.Annotations[mdInPlaceUpgradeNeededAnnotation]) == "true" } -func (r *MachineDeploymentReconciler) machinesToUpgrade(ctx context.Context, md *clusterv1.MachineDeployment) ([]corev1.ObjectReference, error) { +func (r *MachineDeploymentReconciler) machinesToUpgrade(ctx context.Context, md *clusterv1.MachineDeployment) ([]*clusterv1.Machine, []corev1.ObjectReference, error) { selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{workerMachineLabel: md.Name}}) if err != nil { - return nil, err + return nil, nil, err } machineList := &clusterv1.MachineList{} if err := r.client.List(ctx, machineList, &client.ListOptions{LabelSelector: selector, Namespace: md.Namespace}); err != nil { - return nil, err + return nil, nil, err } machines := collections.FromMachineList(machineList).SortedByCreationTimestamp() machineObjects := make([]corev1.ObjectReference, 0, len(machines)) @@ -212,7 +205,7 @@ func (r *MachineDeploymentReconciler) machinesToUpgrade(ctx context.Context, md }, ) } - return machineObjects, nil + return machines, machineObjects, nil } func machineDeploymentUpgrade(md *clusterv1.MachineDeployment, machines []corev1.ObjectReference) (*anywherev1.MachineDeploymentUpgrade, error) { diff --git a/controllers/machinedeployment_controller_test.go b/controllers/machinedeployment_controller_test.go index 465447b106e0..b14a63da3c8a 100644 --- a/controllers/machinedeployment_controller_test.go +++ b/controllers/machinedeployment_controller_test.go @@ -7,10 +7,10 @@ import ( "time" . "github.com/onsi/gomega" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/utils/pointer" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -20,6 +20,8 @@ import ( "github.com/aws/eks-anywhere/pkg/constants" ) +const mdInPlaceAnnotation = "machinedeployment.clusters.x-k8s.io/in-place-upgrade-needed" + type mdObjects struct { machine *clusterv1.Machine mdUpgrade *anywherev1.MachineDeploymentUpgrade @@ -35,35 +37,12 @@ func TestMDSetupWithManager(t *testing.T) { g.Expect(r.SetupWithManager(env.Manager())).To(Succeed()) } -func TestMDReconcile(t *testing.T) { - g := NewWithT(t) - ctx := context.Background() - mdObjs := getObjectsForMD() - - runtimeObjs := []runtime.Object{mdObjs.machine, mdObjs.mdUpgrade, mdObjs.md, mdObjs.mhc} - client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build() - r := controllers.NewMachineDeploymentReconciler(client) - req := mdRequest(mdObjs.md) - _, err := r.Reconcile(ctx, req) - g.Expect(err).ToNot(HaveOccurred()) - - mdu := &anywherev1.MachineDeploymentUpgrade{} - err = client.Get(ctx, types.NamespacedName{Name: mdObjs.mdUpgrade.Name, Namespace: constants.EksaSystemNamespace}, mdu) - g.Expect(err).ToNot(HaveOccurred()) - - mhc := &clusterv1.MachineHealthCheck{} - err = client.Get(ctx, types.NamespacedName{Name: mdObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc) - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused")) -} - -func TestMDReconcileComplete(t *testing.T) { +func TestMDReconcileNotNeeded(t *testing.T) { g := NewWithT(t) ctx := context.Background() mdObjs := getObjectsForMD() - mdObjs.md.Spec.Replicas = pointer.Int32(1) - mdObjs.md.Status.UpdatedReplicas = 1 + delete(mdObjs.md.Annotations, mdInPlaceAnnotation) runtimeObjs := []runtime.Object{mdObjs.md, mdObjs.mhc} client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build() @@ -72,42 +51,32 @@ func TestMDReconcileComplete(t *testing.T) { _, err := r.Reconcile(ctx, req) g.Expect(err).ToNot(HaveOccurred()) - md := &clusterv1.MachineDeployment{} - err = client.Get(ctx, types.NamespacedName{Name: mdObjs.md.Name, Namespace: constants.EksaSystemNamespace}, md) - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(md.Annotations).ToNot(HaveKey("machinedeployment.clusters.x-k8s.io/in-place-upgrade-needed")) - mhc := &clusterv1.MachineHealthCheck{} err = client.Get(ctx, types.NamespacedName{Name: mdObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc) g.Expect(err).ToNot(HaveOccurred()) - g.Eventually(func(g Gomega) error { - func(g Gomega) { - g.Expect(mhc.Annotations).To(HaveKey("cluster.x-k8s.io/paused")) - }(g) - - return nil - }) - g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused")) + g.Expect(mhc.Annotations).ToNot(HaveKey(capiPausedAnnotation)) } -func TestMDReconcileNotNeeded(t *testing.T) { +func TestMDReconcile(t *testing.T) { g := NewWithT(t) ctx := context.Background() mdObjs := getObjectsForMD() - delete(mdObjs.md.Annotations, "machinedeployment.clusters.x-k8s.io/in-place-upgrade-needed") - - runtimeObjs := []runtime.Object{mdObjs.md, mdObjs.mhc} + runtimeObjs := []runtime.Object{mdObjs.machine, mdObjs.mdUpgrade, mdObjs.md, mdObjs.mhc} client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build() r := controllers.NewMachineDeploymentReconciler(client) req := mdRequest(mdObjs.md) _, err := r.Reconcile(ctx, req) g.Expect(err).ToNot(HaveOccurred()) + mdu := &anywherev1.MachineDeploymentUpgrade{} + err = client.Get(ctx, types.NamespacedName{Name: mdObjs.mdUpgrade.Name, Namespace: constants.EksaSystemNamespace}, mdu) + g.Expect(err).ToNot(HaveOccurred()) + mhc := &clusterv1.MachineHealthCheck{} err = client.Get(ctx, types.NamespacedName{Name: mdObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused")) + g.Expect(mhc.Annotations).ToNot(HaveKey(capiPausedAnnotation)) } func TestMDReconcileCreateMachineDeploymentUpgrade(t *testing.T) { @@ -132,15 +101,16 @@ func TestMDReconcileCreateMachineDeploymentUpgrade(t *testing.T) { mhc := &clusterv1.MachineHealthCheck{} err = client.Get(ctx, types.NamespacedName{Name: mdObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(mhc.Annotations).To(HaveKey("cluster.x-k8s.io/paused")) + g.Expect(mhc.Annotations).To(HaveKey(capiPausedAnnotation)) } -func TestMDReconcileMachineDeploymentUpgradeReady(t *testing.T) { +func TestMDReconcileMDAndMachineDeploymentUpgradeReady(t *testing.T) { g := NewWithT(t) ctx := context.Background() mdObjs := getObjectsForMD() mdObjs.mdUpgrade.Status.Ready = true + mdObjs.machine.Spec.Version = &mdObjs.mdUpgrade.Spec.KubernetesVersion runtimeObjs := []runtime.Object{mdObjs.machine, mdObjs.md, mdObjs.mdUpgrade, mdObjs.mhc} client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build() @@ -152,68 +122,80 @@ func TestMDReconcileMachineDeploymentUpgradeReady(t *testing.T) { mdu := &anywherev1.MachineDeploymentUpgrade{} err = client.Get(ctx, types.NamespacedName{Name: mdObjs.mdUpgrade.Name, Namespace: constants.EksaSystemNamespace}, mdu) g.Expect(err).To(HaveOccurred()) + g.Expect(err).To(MatchError("machinedeploymentupgrades.anywhere.eks.amazonaws.com \"my-cluster-md-upgrade\" not found")) + + md := &clusterv1.MachineDeployment{} + err = client.Get(ctx, types.NamespacedName{Name: mdObjs.md.Name, Namespace: constants.EksaSystemNamespace}, md) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(md.Annotations).ToNot(HaveKey(mdInPlaceAnnotation)) mhc := &clusterv1.MachineHealthCheck{} err = client.Get(ctx, types.NamespacedName{Name: mdObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused")) + g.Expect(mhc.Annotations).ToNot(HaveKey(capiPausedAnnotation)) } -func TestMDReconcileMDAndMachineDeploymentUpgradeReady(t *testing.T) { +func TestMDReconcileFullFlow(t *testing.T) { g := NewWithT(t) ctx := context.Background() mdObjs := getObjectsForMD() - mdObjs.mdUpgrade.Status.Ready = true - mdObjs.md.Status.UpdatedReplicas = *mdObjs.md.Spec.Replicas - - runtimeObjs := []runtime.Object{mdObjs.machine, mdObjs.md, mdObjs.mdUpgrade, mdObjs.mhc} + runtimeObjs := []runtime.Object{mdObjs.machine, mdObjs.md, mdObjs.mhc} client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build() r := controllers.NewMachineDeploymentReconciler(client) req := mdRequest(mdObjs.md) _, err := r.Reconcile(ctx, req) g.Expect(err).ToNot(HaveOccurred()) + // Expect MachineDeploymentUpgrade object to be created and not ready mdu := &anywherev1.MachineDeploymentUpgrade{} err = client.Get(ctx, types.NamespacedName{Name: mdObjs.mdUpgrade.Name, Namespace: constants.EksaSystemNamespace}, mdu) - g.Expect(err).To(HaveOccurred()) - g.Expect(err).To(MatchError("machinedeploymentupgrades.anywhere.eks.amazonaws.com \"my-cluster-md-upgrade\" not found")) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(mdu.Status.Ready).To(BeFalse()) + // Expect KCP to still have in-place annotation md := &clusterv1.MachineDeployment{} err = client.Get(ctx, types.NamespacedName{Name: mdObjs.md.Name, Namespace: constants.EksaSystemNamespace}, md) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(md.Annotations).ToNot(HaveKey("machinedeployment.clusters.x-k8s.io/in-place-upgrade-needed")) + g.Expect(md.Annotations).To(HaveKey(mdInPlaceAnnotation)) + // Expect MHC for KCP to be paused mhc := &clusterv1.MachineHealthCheck{} err = client.Get(ctx, types.NamespacedName{Name: mdObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused")) -} + g.Expect(mhc.Annotations).To(HaveKey(capiPausedAnnotation)) -func TestMDReconcileMDReadyAndMachineDeploymentUpgradeAlreadyDeleted(t *testing.T) { - g := NewWithT(t) - ctx := context.Background() - mdObjs := getObjectsForMD() + machine := &clusterv1.Machine{} + err = client.Get(ctx, types.NamespacedName{Name: mdObjs.machine.Name, Namespace: constants.EksaSystemNamespace}, machine) + g.Expect(err).ToNot(HaveOccurred()) - mdObjs.md.Status.UpdatedReplicas = *mdObjs.md.Spec.Replicas + // Mark MachineDeploymentUpgrade as ready and update Machine K8s version + mdu.Status.Ready = true + err = client.Update(ctx, mdu) + g.Expect(err).ToNot(HaveOccurred()) + machine.Spec.Version = &mdu.Spec.KubernetesVersion + err = client.Update(ctx, machine) + g.Expect(err).ToNot(HaveOccurred()) - runtimeObjs := []runtime.Object{mdObjs.machine, mdObjs.md, mdObjs.mhc} - client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build() - r := controllers.NewMachineDeploymentReconciler(client) - req := mdRequest(mdObjs.md) - _, err := r.Reconcile(ctx, req) + // trigger another reconcile loop + req = mdRequest(md) + _, err = r.Reconcile(ctx, req) g.Expect(err).ToNot(HaveOccurred()) - // verify the in-place-upgrade-needed annotation is removed even when the MachineDeploymentUpgrade object is not found - md := &clusterv1.MachineDeployment{} + // Expect MachineDeploymentUpgrade object to be deleted + err = client.Get(ctx, types.NamespacedName{Name: mdObjs.mdUpgrade.Name, Namespace: constants.EksaSystemNamespace}, mdu) + g.Expect(err).To(HaveOccurred()) + g.Expect(apierrors.IsNotFound(err)).To(BeTrue()) + + // Expect MD to no longer have in-place annotation err = client.Get(ctx, types.NamespacedName{Name: mdObjs.md.Name, Namespace: constants.EksaSystemNamespace}, md) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(md.Annotations).ToNot(HaveKey("machinedeployment.clusters.x-k8s.io/in-place-upgrade-needed")) + g.Expect(md.Annotations).ToNot(HaveKey(mdInPlaceAnnotation)) - mhc := &clusterv1.MachineHealthCheck{} + // Expect MHC for MD to not be paused err = client.Get(ctx, types.NamespacedName{Name: mdObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused")) + g.Expect(mhc.Annotations).ToNot(HaveKey(capiPausedAnnotation)) } func TestMDReconcileNotFound(t *testing.T) {