Skip to content

Commit

Permalink
Pause machine health checks during inplace upgrades
Browse files Browse the repository at this point in the history
  • Loading branch information
taneyland committed Feb 13, 2024
1 parent b3876c2 commit aaf1d6d
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 14 deletions.
35 changes: 35 additions & 0 deletions controllers/kubeadmcontrolplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/util/errors"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -134,6 +135,18 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, log logr.
return ctrl.Result{}, nil
}

mhc := &clusterv1.MachineHealthCheck{}
if err := r.client.Get(ctx, GetNamespacedNameType(cpMachineHealthCheckName(kcp.Name), constants.EksaSystemNamespace), mhc); err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, err
}
return ctrl.Result{}, fmt.Errorf("getting MachineHealthCheck %s: %v", cpMachineHealthCheckName(kcp.Name), err)

Check warning on line 143 in controllers/kubeadmcontrolplane_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/kubeadmcontrolplane_controller.go#L143

Added line #L143 was not covered by tests
}
mhcPatchHelper, err := patch.NewHelper(mhc, r.client)
if err != nil {
return ctrl.Result{}, err

Check warning on line 147 in controllers/kubeadmcontrolplane_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/kubeadmcontrolplane_controller.go#L147

Added line #L147 was not covered by tests
}

if cpuGetErr != nil {
if apierrors.IsNotFound(cpuGetErr) {
log.Info("Creating ControlPlaneUpgrade object")
Expand All @@ -145,6 +158,14 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, log logr.
if err != nil {
return ctrl.Result{}, fmt.Errorf("generating ControlPlaneUpgrade: %v", err)
}

log.Info("Pausing control plane machine health check", "MachineHealthCheck", cpMachineHealthCheckName(kcp.Name))

r.pauseMachineHealthCheck(mhc)
if err := mhcPatchHelper.Patch(ctx, mhc); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)

Check warning on line 166 in controllers/kubeadmcontrolplane_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/kubeadmcontrolplane_controller.go#L166

Added line #L166 was not covered by tests
}

if err := r.client.Create(ctx, cpUpgrade); client.IgnoreAlreadyExists(err) != nil {
return ctrl.Result{}, fmt.Errorf("failed to create ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, err)
}
Expand All @@ -161,6 +182,12 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, log logr.
return ctrl.Result{}, fmt.Errorf("deleting ControlPlaneUpgrade object: %v", err)
}

log.Info("Resuming control plane machine health check", "MachineHealthCheck", cpMachineHealthCheckName(kcp.Name))
delete(mhc.Annotations, clusterv1.PausedAnnotation)
if err := mhcPatchHelper.Patch(ctx, mhc); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)

Check warning on line 188 in controllers/kubeadmcontrolplane_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/kubeadmcontrolplane_controller.go#L188

Added line #L188 was not covered by tests
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -201,6 +228,10 @@ func (r *KubeadmControlPlaneReconciler) validateStackedEtcd(kcp *controlplanev1.
return nil
}

func (r *KubeadmControlPlaneReconciler) pauseMachineHealthCheck(mhc *clusterv1.MachineHealthCheck) {
annotations.AddAnnotations(mhc, map[string]string{clusterv1.PausedAnnotation: "true"})
}

func controlPlaneUpgrade(kcp *controlplanev1.KubeadmControlPlane, machines []corev1.ObjectReference) (*anywherev1.ControlPlaneUpgrade, error) {
kcpSpec, err := json.Marshal(kcp.Spec)
if err != nil {
Expand Down Expand Up @@ -235,3 +266,7 @@ func controlPlaneUpgrade(kcp *controlplanev1.KubeadmControlPlane, machines []cor
func cpUpgradeName(kcpName string) string {
return kcpName + "-cp-upgrade"
}

func cpMachineHealthCheckName(kcpName string) string {
return fmt.Sprintf("%s-kcp-unhealthy", kcpName)
}
83 changes: 76 additions & 7 deletions controllers/kubeadmcontrolplane_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"testing"
"time"

. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -26,6 +28,7 @@ type kcpObjects struct {
machines []*clusterv1.Machine
cpUpgrade *anywherev1.ControlPlaneUpgrade
kcp *controlplanev1.KubeadmControlPlane
mhc *clusterv1.MachineHealthCheck
}

func TestKCPSetupWithManager(t *testing.T) {
Expand All @@ -41,7 +44,7 @@ func TestKCPReconcile(t *testing.T) {
ctx := context.Background()
kcpObjs := getObjectsForKCP()

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp}
runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
Expand All @@ -62,7 +65,7 @@ func TestKCPReconcileComplete(t *testing.T) {
kcpObjs.kcp.Spec.Replicas = pointer.Int32(count)
kcpObjs.kcp.Status.UpdatedReplicas = count

runtimeObjs := []runtime.Object{kcpObjs.kcp}
runtimeObjs := []runtime.Object{kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
Expand All @@ -73,6 +76,18 @@ func TestKCPReconcileComplete(t *testing.T) {
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.kcp.Name, Namespace: constants.EksaSystemNamespace}, kcp)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(kcp.Annotations).ToNot(HaveKey("controlplane.clusters.x-k8s.io/in-place-upgrade-needed"))

mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Eventually(func(g Gomega) error {
func(g Gomega) {
g.Expect(mhc.Annotations).To(HaveKey("cluster.x-k8s.io/paused"))
}(g)

return nil
})
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
}

func TestKCPReconcileNotNeeded(t *testing.T) {
Expand All @@ -82,20 +97,25 @@ func TestKCPReconcileNotNeeded(t *testing.T) {

delete(kcpObjs.kcp.Annotations, "controlplane.clusters.x-k8s.io/in-place-upgrade-needed")

runtimeObjs := []runtime.Object{kcpObjs.kcp}
runtimeObjs := []runtime.Object{kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).ToNot(HaveOccurred())

mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
}

func TestKCPReconcileCreateControlPlaneUpgrade(t *testing.T) {
g := NewWithT(t)
ctx := context.Background()
kcpObjs := getObjectsForKCP()

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp}
runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
Expand All @@ -112,6 +132,11 @@ func TestKCPReconcileCreateControlPlaneUpgrade(t *testing.T) {
kcpSpec, err := json.Marshal(kcpObjs.kcp.Spec)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(cpu.Spec.ControlPlaneSpecData).To(BeEquivalentTo(base64.StdEncoding.EncodeToString(kcpSpec)))

mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).To(HaveKey("cluster.x-k8s.io/paused"))
}

func TestKCPReconcileControlPlaneUpgradeReady(t *testing.T) {
Expand All @@ -121,7 +146,7 @@ func TestKCPReconcileControlPlaneUpgradeReady(t *testing.T) {

kcpObjs.cpUpgrade.Status.Ready = true

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp}
runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
Expand All @@ -131,6 +156,11 @@ func TestKCPReconcileControlPlaneUpgradeReady(t *testing.T) {
cpu := &anywherev1.ControlPlaneUpgrade{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.cpUpgrade.Name, Namespace: constants.EksaSystemNamespace}, cpu)
g.Expect(err).To(HaveOccurred())

mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
}

func TestKCPReconcileKCPAndControlPlaneUpgradeReady(t *testing.T) {
Expand All @@ -141,7 +171,7 @@ func TestKCPReconcileKCPAndControlPlaneUpgradeReady(t *testing.T) {
kcpObjs.kcp.Status.UpdatedReplicas = *kcpObjs.kcp.Spec.Replicas
kcpObjs.cpUpgrade.Status.Ready = true

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp}
runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
Expand All @@ -157,6 +187,11 @@ func TestKCPReconcileKCPAndControlPlaneUpgradeReady(t *testing.T) {
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.kcp.Name, Namespace: constants.EksaSystemNamespace}, kcp)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(kcp.Annotations).ToNot(HaveKey("controlplane.clusters.x-k8s.io/in-place-upgrade-needed"))

mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
}

func TestKCPReconcileKCPReadyAndCPUpgradeAlreadyDeleted(t *testing.T) {
Expand All @@ -166,7 +201,7 @@ func TestKCPReconcileKCPReadyAndCPUpgradeAlreadyDeleted(t *testing.T) {

kcpObjs.kcp.Status.UpdatedReplicas = *kcpObjs.kcp.Spec.Replicas

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp}
runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
Expand All @@ -178,6 +213,11 @@ func TestKCPReconcileKCPReadyAndCPUpgradeAlreadyDeleted(t *testing.T) {
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.kcp.Name, Namespace: constants.EksaSystemNamespace}, kcp)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(kcp.Annotations).ToNot(HaveKey("controlplane.clusters.x-k8s.io/in-place-upgrade-needed"))

mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
}

func TestKCPReconcileNotFound(t *testing.T) {
Expand All @@ -192,6 +232,19 @@ func TestKCPReconcileNotFound(t *testing.T) {
g.Expect(err).To(MatchError("kubeadmcontrolplanes.controlplane.cluster.x-k8s.io \"my-cluster\" not found"))
}

func TestKCPReconcileMHCNotFound(t *testing.T) {
g := NewWithT(t)
ctx := context.Background()
kcpObjs := getObjectsForKCP()

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).To(MatchError("machinehealthchecks.cluster.x-k8s.io \"my-cluster-kcp-unhealthy\" not found"))
}

func TestKCPReconcileClusterConfigurationMissing(t *testing.T) {
g := NewWithT(t)
ctx := context.Background()
Expand Down Expand Up @@ -253,11 +306,13 @@ func getObjectsForKCP() kcpObjects {
Name: kcp.Name,
UID: kcp.UID,
}}
mhc := generateMHCforKCP(kcp.Name)

return kcpObjects{
machines: machines,
cpUpgrade: cpUpgrade,
kcp: kcp,
mhc: mhc,
}
}

Expand Down Expand Up @@ -297,3 +352,17 @@ func generateKCP(name string) *controlplanev1.KubeadmControlPlane {
},
}
}

func generateMHCforKCP(kcpName string) *clusterv1.MachineHealthCheck {
return &clusterv1.MachineHealthCheck{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-kcp-unhealthy", kcpName),
Namespace: "eksa-system",
},
Spec: clusterv1.MachineHealthCheckSpec{
NodeStartupTimeout: &metav1.Duration{
Duration: 20 * time.Minute,
},
},
}
}
35 changes: 35 additions & 0 deletions controllers/machinedeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -132,6 +133,18 @@ func (r *MachineDeploymentReconciler) reconcile(ctx context.Context, log logr.Lo
return ctrl.Result{}, nil
}

mhc := &clusterv1.MachineHealthCheck{}
if err := r.client.Get(ctx, GetNamespacedNameType(mdMachineHealthCheckName(md.Name), constants.EksaSystemNamespace), mhc); err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, err
}
return ctrl.Result{}, fmt.Errorf("getting MachineHealthCheck %s: %v", mdMachineHealthCheckName(md.Name), err)

Check warning on line 141 in controllers/machinedeployment_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/machinedeployment_controller.go#L141

Added line #L141 was not covered by tests
}
mhcPatchHelper, err := patch.NewHelper(mhc, r.client)
if err != nil {
return ctrl.Result{}, err

Check warning on line 145 in controllers/machinedeployment_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/machinedeployment_controller.go#L145

Added line #L145 was not covered by tests
}

if mduGetErr != nil {
if apierrors.IsNotFound(mduGetErr) {
log.Info("Creating MachineDeploymentUpgrade object")
Expand All @@ -143,6 +156,14 @@ func (r *MachineDeploymentReconciler) reconcile(ctx context.Context, log logr.Lo
if err != nil {
return ctrl.Result{}, fmt.Errorf("generating MachineDeploymentUpgrade: %v", err)
}

log.Info("Pausing machine deployment machine health check", "MachineHealthCheck", mdMachineHealthCheckName(md.Name))

r.pauseMachineHealthCheck(mhc)
if err := mhcPatchHelper.Patch(ctx, mhc); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)

Check warning on line 164 in controllers/machinedeployment_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/machinedeployment_controller.go#L164

Added line #L164 was not covered by tests
}

if err := r.client.Create(ctx, mdUpgrade); client.IgnoreAlreadyExists(err) != nil {
return ctrl.Result{}, fmt.Errorf("failed to create MachineDeploymentUpgrade for MachineDeployment %s: %v", md.Name, err)
}
Expand All @@ -158,13 +179,23 @@ func (r *MachineDeploymentReconciler) reconcile(ctx context.Context, log logr.Lo
return ctrl.Result{}, fmt.Errorf("deleting MachineDeploymentUpgrade object: %v", err)
}

log.Info("Resuming machine deployment machine health check", "MachineHealthCheck", mdMachineHealthCheckName(md.Name))
delete(mhc.Annotations, clusterv1.PausedAnnotation)
if err := mhcPatchHelper.Patch(ctx, mhc); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)

Check warning on line 185 in controllers/machinedeployment_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/machinedeployment_controller.go#L185

Added line #L185 was not covered by tests
}

return ctrl.Result{}, nil
}

func (r *MachineDeploymentReconciler) inPlaceUpgradeNeeded(md *clusterv1.MachineDeployment) bool {
return strings.ToLower(md.Annotations[mdInPlaceUpgradeNeededAnnotation]) == "true"
}

func (r *MachineDeploymentReconciler) pauseMachineHealthCheck(mhc *clusterv1.MachineHealthCheck) {
annotations.AddAnnotations(mhc, map[string]string{clusterv1.PausedAnnotation: "true"})
}

func (r *MachineDeploymentReconciler) machinesToUpgrade(ctx context.Context, md *clusterv1.MachineDeployment) ([]corev1.ObjectReference, error) {
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{workerMachineLabel: md.Name}})
if err != nil {
Expand Down Expand Up @@ -220,3 +251,7 @@ func machineDeploymentUpgrade(md *clusterv1.MachineDeployment, machines []corev1
func mdUpgradeName(mdName string) string {
return mdName + "-md-upgrade"
}

func mdMachineHealthCheckName(mdName string) string {
return fmt.Sprintf("%s-worker-unhealthy", mdName)
}
Loading

0 comments on commit aaf1d6d

Please sign in to comment.