Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-0.19] Fix InPlace upgrade objects cleanup race conditions #7666

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 25 additions & 43 deletions controllers/kubeadmcontrolplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, log logr.
return ctrl.Result{}, err
}

cpUpgrade := &anywherev1.ControlPlaneUpgrade{}
cpuGetErr := r.client.Get(ctx, GetNamespacedNameType(cpUpgradeName(kcp.Name), constants.EksaSystemNamespace), cpUpgrade)

mhc := &clusterv1.MachineHealthCheck{}
if err := r.client.Get(ctx, GetNamespacedNameType(cpMachineHealthCheckName(kcp.Name), constants.EksaSystemNamespace), mhc); err != nil {
if apierrors.IsNotFound(err) {
Expand All @@ -131,8 +128,10 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, log logr.
return ctrl.Result{}, err
}

if kcp.Spec.Replicas != nil && (*kcp.Spec.Replicas == kcp.Status.UpdatedReplicas) {
if cpuGetErr == nil && cpUpgrade.Status.Ready {
cpUpgrade := &anywherev1.ControlPlaneUpgrade{}
cpuGetErr := r.client.Get(ctx, GetNamespacedNameType(cpUpgradeName(kcp.Name), constants.EksaSystemNamespace), cpUpgrade)
if cpuGetErr == nil {
if cpUpgrade.Status.Ready && kcp.Status.Version != nil && *kcp.Status.Version == cpUpgrade.Spec.KubernetesVersion {
log.Info("Control plane upgrade complete, deleting object", "ControlPlaneUpgrade", cpUpgrade.Name)
if err := r.client.Delete(ctx, cpUpgrade); err != nil {
return ctrl.Result{}, fmt.Errorf("deleting ControlPlaneUpgrade object: %v", err)
Expand All @@ -141,55 +140,38 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, log logr.
if err := resumeMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)
}
} else if !apierrors.IsNotFound(cpuGetErr) {
return ctrl.Result{}, fmt.Errorf("getting ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, cpuGetErr)
}

log.Info("KubeadmControlPlane is ready, removing the \"in-place-upgrade-needed\" annotation")
// Remove the in-place-upgrade-needed annotation only after the ControlPlaneUpgrade object is deleted
delete(kcp.Annotations, kcpInPlaceUpgradeNeededAnnotation)
log.Info("KubeadmControlPlane is ready, removing the \"in-place-upgrade-needed\" annotation")
// Remove the in-place-upgrade-needed annotation only after the ControlPlaneUpgrade object is deleted
delete(kcp.Annotations, kcpInPlaceUpgradeNeededAnnotation)
}
return ctrl.Result{}, nil
}

if cpuGetErr != nil {
if apierrors.IsNotFound(cpuGetErr) {
log.Info("Creating ControlPlaneUpgrade object")
machines, err := r.machinesToUpgrade(ctx, kcp)
if err != nil {
return ctrl.Result{}, fmt.Errorf("retrieving list of control plane machines: %v", err)
}
cpUpgrade, err := controlPlaneUpgrade(kcp, machines)
if err != nil {
return ctrl.Result{}, fmt.Errorf("generating ControlPlaneUpgrade: %v", err)
}
if apierrors.IsNotFound(cpuGetErr) {
log.Info("Creating ControlPlaneUpgrade object")
machines, err := r.machinesToUpgrade(ctx, kcp)
if err != nil {
return ctrl.Result{}, fmt.Errorf("retrieving list of control plane machines: %v", err)
}
cpUpgrade, err := controlPlaneUpgrade(kcp, machines)
if err != nil {
return ctrl.Result{}, fmt.Errorf("generating ControlPlaneUpgrade: %v", err)
}

log.Info("Pausing control plane machine health check", "MachineHealthCheck", cpMachineHealthCheckName(kcp.Name))
if err := pauseMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)
}
log.Info("Pausing control plane machine health check", "MachineHealthCheck", cpMachineHealthCheckName(kcp.Name))
if err := pauseMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)
}

if err := r.client.Create(ctx, cpUpgrade); client.IgnoreAlreadyExists(err) != nil {
return ctrl.Result{}, fmt.Errorf("failed to create ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, err)
}
return ctrl.Result{}, nil
if err := r.client.Create(ctx, cpUpgrade); client.IgnoreAlreadyExists(err) != nil {
return ctrl.Result{}, fmt.Errorf("failed to create ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, err)
}
return ctrl.Result{}, fmt.Errorf("getting ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, cpuGetErr)
}
if !cpUpgrade.Status.Ready {
return ctrl.Result{}, nil
}

log.Info("Control plane upgrade complete, deleting object", "ControlPlaneUpgrade", cpUpgrade.Name)
if err := r.client.Delete(ctx, cpUpgrade); err != nil {
return ctrl.Result{}, fmt.Errorf("deleting ControlPlaneUpgrade object: %v", err)
}

log.Info("Resuming control plane machine health check", "MachineHealthCheck", cpMachineHealthCheckName(kcp.Name))
if err := resumeMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)
}
return ctrl.Result{}, fmt.Errorf("getting ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, err)

return ctrl.Result{}, nil
}

func (r *KubeadmControlPlaneReconciler) inPlaceUpgradeNeeded(kcp *controlplanev1.KubeadmControlPlane) bool {
Expand Down
123 changes: 52 additions & 71 deletions controllers/kubeadmcontrolplane_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

. "github.com/onsi/gomega"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -24,6 +25,11 @@ import (
"github.com/aws/eks-anywhere/pkg/constants"
)

const (
kcpInPlaceAnnotation = "controlplane.clusters.x-k8s.io/in-place-upgrade-needed"
capiPausedAnnotation = "cluster.x-k8s.io/paused"
)

type kcpObjects struct {
machines []*clusterv1.Machine
cpUpgrade *anywherev1.ControlPlaneUpgrade
Expand All @@ -39,31 +45,12 @@ func TestKCPSetupWithManager(t *testing.T) {
g.Expect(r.SetupWithManager(env.Manager())).To(Succeed())
}

func TestKCPReconcile(t *testing.T) {
g := NewWithT(t)
ctx := context.Background()
kcpObjs := getObjectsForKCP()

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).ToNot(HaveOccurred())

cpu := &anywherev1.ControlPlaneUpgrade{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.cpUpgrade.Name, Namespace: constants.EksaSystemNamespace}, cpu)
g.Expect(err).ToNot(HaveOccurred())
}

func TestKCPReconcileComplete(t *testing.T) {
func TestKCPReconcileNotNeeded(t *testing.T) {
g := NewWithT(t)
ctx := context.Background()
kcpObjs := getObjectsForKCP()

count := int32(len(kcpObjs.machines))
kcpObjs.kcp.Spec.Replicas = pointer.Int32(count)
kcpObjs.kcp.Status.UpdatedReplicas = count
delete(kcpObjs.kcp.Annotations, kcpInPlaceAnnotation)

runtimeObjs := []runtime.Object{kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
Expand All @@ -72,42 +59,27 @@ func TestKCPReconcileComplete(t *testing.T) {
_, err := r.Reconcile(ctx, req)
g.Expect(err).ToNot(HaveOccurred())

kcp := &controlplanev1.KubeadmControlPlane{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.kcp.Name, Namespace: constants.EksaSystemNamespace}, kcp)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(kcp.Annotations).ToNot(HaveKey("controlplane.clusters.x-k8s.io/in-place-upgrade-needed"))

mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Eventually(func(g Gomega) error {
func(g Gomega) {
g.Expect(mhc.Annotations).To(HaveKey("cluster.x-k8s.io/paused"))
}(g)

return nil
})
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
g.Expect(mhc.Annotations).ToNot(HaveKey(capiPausedAnnotation))
}

func TestKCPReconcileNotNeeded(t *testing.T) {
func TestKCPReconcile(t *testing.T) {
g := NewWithT(t)
ctx := context.Background()
kcpObjs := getObjectsForKCP()

delete(kcpObjs.kcp.Annotations, "controlplane.clusters.x-k8s.io/in-place-upgrade-needed")

runtimeObjs := []runtime.Object{kcpObjs.kcp, kcpObjs.mhc}
runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).ToNot(HaveOccurred())

mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
cpu := &anywherev1.ControlPlaneUpgrade{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.cpUpgrade.Name, Namespace: constants.EksaSystemNamespace}, cpu)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
}

func TestKCPReconcileCreateControlPlaneUpgrade(t *testing.T) {
Expand Down Expand Up @@ -136,14 +108,15 @@ func TestKCPReconcileCreateControlPlaneUpgrade(t *testing.T) {
mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).To(HaveKey("cluster.x-k8s.io/paused"))
g.Expect(mhc.Annotations).To(HaveKey(capiPausedAnnotation))
}

func TestKCPReconcileControlPlaneUpgradeReady(t *testing.T) {
func TestKCPReconcileKCPAndControlPlaneUpgradeReady(t *testing.T) {
g := NewWithT(t)
ctx := context.Background()
kcpObjs := getObjectsForKCP()

kcpObjs.kcp.Status.Version = &kcpObjs.kcp.Spec.Version
kcpObjs.cpUpgrade.Status.Ready = true

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc}
Expand All @@ -156,68 +129,76 @@ func TestKCPReconcileControlPlaneUpgradeReady(t *testing.T) {
cpu := &anywherev1.ControlPlaneUpgrade{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.cpUpgrade.Name, Namespace: constants.EksaSystemNamespace}, cpu)
g.Expect(err).To(HaveOccurred())
g.Expect(err).To(MatchError("controlplaneupgrades.anywhere.eks.amazonaws.com \"my-cluster-cp-upgrade\" not found"))

kcp := &controlplanev1.KubeadmControlPlane{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.kcp.Name, Namespace: constants.EksaSystemNamespace}, kcp)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(kcp.Annotations).ToNot(HaveKey(kcpInPlaceAnnotation))

mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
g.Expect(mhc.Annotations).ToNot(HaveKey(capiPausedAnnotation))
}

func TestKCPReconcileKCPAndControlPlaneUpgradeReady(t *testing.T) {
func TestKCPReconcileFullFlow(t *testing.T) {
g := NewWithT(t)
ctx := context.Background()
kcpObjs := getObjectsForKCP()

kcpObjs.kcp.Status.UpdatedReplicas = *kcpObjs.kcp.Spec.Replicas
kcpObjs.cpUpgrade.Status.Ready = true

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc}
runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).ToNot(HaveOccurred())

// Expect ControlPlaneUpgrade object to be created and not ready
cpu := &anywherev1.ControlPlaneUpgrade{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.cpUpgrade.Name, Namespace: constants.EksaSystemNamespace}, cpu)
g.Expect(err).To(HaveOccurred())
g.Expect(err).To(MatchError("controlplaneupgrades.anywhere.eks.amazonaws.com \"my-cluster-cp-upgrade\" not found"))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(cpu.Status.Ready).To(BeFalse())

// Expect KCP to still have in-place annotation
kcp := &controlplanev1.KubeadmControlPlane{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.kcp.Name, Namespace: constants.EksaSystemNamespace}, kcp)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(kcp.Annotations).ToNot(HaveKey("controlplane.clusters.x-k8s.io/in-place-upgrade-needed"))
g.Expect(kcp.Annotations).To(HaveKey(kcpInPlaceAnnotation))

// Expect MHC for KCP to be paused
mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
}
g.Expect(mhc.Annotations).To(HaveKey(capiPausedAnnotation))

func TestKCPReconcileKCPReadyAndCPUpgradeAlreadyDeleted(t *testing.T) {
g := NewWithT(t)
ctx := context.Background()
kcpObjs := getObjectsForKCP()

kcpObjs.kcp.Status.UpdatedReplicas = *kcpObjs.kcp.Spec.Replicas
// Mark ControlPlaneUpgrade as ready and update KCP status K8s version
cpu.Status.Ready = true
err = client.Update(ctx, cpu)
g.Expect(err).ToNot(HaveOccurred())
kcp.Status.Version = &kcp.Spec.Version
err = client.Update(ctx, kcp)
g.Expect(err).ToNot(HaveOccurred())

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
// trigger another reconcile loop
req = kcpRequest(kcp)
_, err = r.Reconcile(ctx, req)
g.Expect(err).ToNot(HaveOccurred())

// verify the in-place-upgrade-needed annotation is removed even when the ControlPlaneUpgrade object is not found
kcp := &controlplanev1.KubeadmControlPlane{}
// Expect ControlPlaneUpgrade object to be deleted
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.cpUpgrade.Name, Namespace: constants.EksaSystemNamespace}, cpu)
g.Expect(err).To(HaveOccurred())
g.Expect(apierrors.IsNotFound(err)).To(BeTrue())

// Expect KCP to no longer have in-place annotation
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.kcp.Name, Namespace: constants.EksaSystemNamespace}, kcp)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(kcp.Annotations).ToNot(HaveKey("controlplane.clusters.x-k8s.io/in-place-upgrade-needed"))
g.Expect(kcp.Annotations).ToNot(HaveKey(kcpInPlaceAnnotation))

mhc := &clusterv1.MachineHealthCheck{}
// Expect MHC for KCP to not be paused
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
g.Expect(mhc.Annotations).ToNot(HaveKey(capiPausedAnnotation))
}

func TestKCPReconcileNotFound(t *testing.T) {
Expand Down Expand Up @@ -332,7 +313,7 @@ func generateKCP(name string) *controlplanev1.KubeadmControlPlane {
Namespace: constants.EksaSystemNamespace,
UID: "test-uid",
Annotations: map[string]string{
"controlplane.clusters.x-k8s.io/in-place-upgrade-needed": "true",
kcpInPlaceAnnotation: "true",
},
},
Spec: controlplanev1.KubeadmControlPlaneSpec{
Expand Down
Loading