Skip to content

Commit

Permalink
Refactor machine deployment reconciler
Browse files Browse the repository at this point in the history
  • Loading branch information
HomayoonAlimohammadi committed Nov 8, 2024
1 parent 195e1fe commit d8cc2dc
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,31 @@ import (
bootstrapv1 "github.com/canonical/cluster-api-k8s/bootstrap/api/v1beta2"
"github.com/canonical/cluster-api-k8s/pkg/ck8s"
"github.com/canonical/cluster-api-k8s/pkg/trace"
"github.com/canonical/cluster-api-k8s/pkg/upgrade/inplace"
)

// MachineGetter is an interface that defines the methods a MachineDeploymentReconciler uses to get machines.
type MachineGetter interface {
GetMachinesForCluster(ctx context.Context, cluster client.ObjectKey, filters ...collections.Func) (collections.Machines, error)
}

// MachineDeploymentReconciler reconciles a MachineDeployment object and manages the in-place upgrades.
type MachineDeploymentReconciler struct {
// OrchestratedInPlaceUpgradeController reconciles a MachineDeployment object and manages the in-place upgrades.
type OrchestratedInPlaceUpgradeController struct {
scheme *runtime.Scheme
recorder record.EventRecorder
machineGetter MachineGetter
machineGetter inplace.MachineGetter

client.Client
Log logr.Logger
}

// MachineDeploymentUpgradeScope is a struct that holds the context of the upgrade process.
type MachineDeploymentUpgradeScope struct {
MachineDeployment *clusterv1.MachineDeployment
PatchHelper *patch.Helper
UpgradeTo string
OwnedMachines []*clusterv1.Machine
// orchestratedInPlaceUpgradeScope is a struct that holds the context of the upgrade process.
type orchestratedInPlaceUpgradeScope struct {
machineDeployment *clusterv1.MachineDeployment
mdPatcher inplace.Patcher
upgradeTo string
ownedMachines []*clusterv1.Machine
}

// NewMachineDeploymentReconciler creates a new MachineDeploymentReconciler.
func (r *MachineDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
// SetupWithManager sets up the controller with the Manager.
func (r *OrchestratedInPlaceUpgradeController) SetupWithManager(mgr ctrl.Manager) error {
r.scheme = mgr.GetScheme()
r.recorder = mgr.GetEventRecorderFor("ck8s-machine-deployment-controller")
r.recorder = mgr.GetEventRecorderFor("ck8s-md-orchestrated-inplace-upgrade-controller")

if r.machineGetter == nil {
r.machineGetter = &ck8s.Management{
Expand Down Expand Up @@ -77,9 +73,9 @@ func (r *MachineDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=machinedeployments;machinedeployments/status,verbs=get;list;watch;create;update;patch;delete

// Reconcile handles the reconciliation of a MachineDeployment object.
func (r *MachineDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *OrchestratedInPlaceUpgradeController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
traceID := trace.NewID()
log := r.Log.WithValues("machine_deployment", req.NamespacedName, "trace_id", traceID)
log := r.Log.WithValues("orchestrated_inplace_upgrade", req.NamespacedName, "trace_id", traceID)
log.V(1).Info("Reconciliation started...")

machineDeployment := &clusterv1.MachineDeployment{}
Expand All @@ -91,41 +87,36 @@ func (r *MachineDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Re
return ctrl.Result{}, fmt.Errorf("failed to get MachineDeployment: %w", err)
}

if r.getUpgradeInstructions(machineDeployment) == "" {
if inplace.GetUpgradeInstructions(machineDeployment) == "" {
log.V(1).Info("MachineDeployment has no upgrade instructions, skipping reconciliation")
return ctrl.Result{}, nil
}

if !machineDeployment.DeletionTimestamp.IsZero() {
if isDeleted(machineDeployment) {
log.V(1).Info("MachineDeployment is being deleted, skipping reconciliation")
return ctrl.Result{}, nil
}

ownedMachines, err := r.getOwnedMachines(ctx, machineDeployment)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get owned machines: %w", err)
}

scope, err := r.createScope(machineDeployment, ownedMachines)
scope, err := r.createScope(ctx, machineDeployment)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to create scope: %w", err)
}

// Starting the upgrade process
var upgradedMachines int
for _, m := range ownedMachines {
if r.isMachineUpgraded(scope, m) {
for _, m := range scope.ownedMachines {
if inplace.IsUpgraded(m, scope.upgradeTo) {
log.V(1).Info("Machine is already upgraded", "machine", m.Name)
upgradedMachines++
continue
}

if !m.DeletionTimestamp.IsZero() {
if isDeleted(m) {
log.V(1).Info("Machine is being deleted, requeuing...", "machine", m.Name)
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}

if r.isMachineUpgradeFailed(m) {
if inplace.IsMachineUpgradeFailed(m) {
log.Info("Machine upgrade failed for machine, requeuing...", "machine", m.Name)
if err := r.markUpgradeFailed(ctx, scope, m); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to mark upgrade as failed: %w", err)
Expand All @@ -134,7 +125,7 @@ func (r *MachineDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Re
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}

if r.isMachineUpgrading(m) {
if inplace.IsMachineUpgrading(m) {
log.V(1).Info("Machine is upgrading, requeuing...", "machine", m.Name)
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
Expand All @@ -153,39 +144,28 @@ func (r *MachineDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Re
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}

if upgradedMachines == len(ownedMachines) {
if upgradedMachines == len(scope.ownedMachines) {
if err := r.markUpgradeDone(ctx, scope); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to mark upgrade as done: %w", err)
}

log.V(1).Info("All machines are upgraded")
// Finish
return ctrl.Result{}, nil
}

// Not all the machines were upgraded, requeue.
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}

// markUpgradeInProgress marks the MachineDeployment as in-place upgrade in-progress.
func (r *MachineDeploymentReconciler) markUpgradeInProgress(ctx context.Context, scope *MachineDeploymentUpgradeScope, upgradingMachine *clusterv1.Machine) error {
mdAnnotations := scope.MachineDeployment.Annotations
if mdAnnotations == nil {
mdAnnotations = make(map[string]string)
}

// clean up
delete(mdAnnotations, bootstrapv1.InPlaceUpgradeReleaseAnnotation)

mdAnnotations[bootstrapv1.InPlaceUpgradeStatusAnnotation] = bootstrapv1.InPlaceUpgradeInProgressStatus
mdAnnotations[bootstrapv1.InPlaceUpgradeToAnnotation] = scope.UpgradeTo

scope.MachineDeployment.SetAnnotations(mdAnnotations)

if err := scope.PatchHelper.Patch(ctx, scope.MachineDeployment); err != nil {
return fmt.Errorf("failed to patch: %w", err)
func (r *OrchestratedInPlaceUpgradeController) markUpgradeInProgress(ctx context.Context, scope *orchestratedInPlaceUpgradeScope, upgradingMachine *clusterv1.Machine) error {
if err := inplace.MarkUpgradeInProgress(ctx, scope.machineDeployment, scope.upgradeTo, scope.mdPatcher); err != nil {
return fmt.Errorf("failed to mark object with upgrade in-progress: %w", err)
}

r.recorder.Eventf(
scope.MachineDeployment,
scope.machineDeployment,
corev1.EventTypeNormal,
bootstrapv1.InPlaceUpgradeInProgressEvent,
"In-place upgrade is in-progress for %q",
Expand All @@ -195,26 +175,13 @@ func (r *MachineDeploymentReconciler) markUpgradeInProgress(ctx context.Context,
}

// markUpgradeDone marks the MachineDeployment as in-place upgrade done.
func (r *MachineDeploymentReconciler) markUpgradeDone(ctx context.Context, scope *MachineDeploymentUpgradeScope) error {
annotations := scope.MachineDeployment.Annotations
if annotations == nil {
annotations = make(map[string]string)
}

// clean up
delete(annotations, bootstrapv1.InPlaceUpgradeToAnnotation)

annotations[bootstrapv1.InPlaceUpgradeStatusAnnotation] = bootstrapv1.InPlaceUpgradeDoneStatus
annotations[bootstrapv1.InPlaceUpgradeReleaseAnnotation] = scope.UpgradeTo

scope.MachineDeployment.SetAnnotations(annotations)

if err := scope.PatchHelper.Patch(ctx, scope.MachineDeployment); err != nil {
return fmt.Errorf("failed to patch: %w", err)
func (r *OrchestratedInPlaceUpgradeController) markUpgradeDone(ctx context.Context, scope *orchestratedInPlaceUpgradeScope) error {
if err := inplace.MarkUpgradeDone(ctx, scope.machineDeployment, scope.upgradeTo, scope.mdPatcher); err != nil {
return fmt.Errorf("failed to mark object with upgrade done: %w", err)
}

r.recorder.Eventf(
scope.MachineDeployment,
scope.machineDeployment,
corev1.EventTypeNormal,
bootstrapv1.InPlaceUpgradeDoneEvent,
"In-place upgrade is done",
Expand All @@ -223,24 +190,13 @@ func (r *MachineDeploymentReconciler) markUpgradeDone(ctx context.Context, scope
}

// markUpgradeFailed marks the MachineDeployment as in-place upgrade failed.
func (r *MachineDeploymentReconciler) markUpgradeFailed(ctx context.Context, scope *MachineDeploymentUpgradeScope, failedM *clusterv1.Machine) error {
annotations := scope.MachineDeployment.Annotations
if annotations == nil {
annotations = make(map[string]string)
}

// clean up
delete(annotations, bootstrapv1.InPlaceUpgradeReleaseAnnotation)

annotations[bootstrapv1.InPlaceUpgradeStatusAnnotation] = bootstrapv1.InPlaceUpgradeFailedStatus
scope.MachineDeployment.SetAnnotations(annotations)

if err := scope.PatchHelper.Patch(ctx, scope.MachineDeployment); err != nil {
return fmt.Errorf("failed to patch: %w", err)
func (r *OrchestratedInPlaceUpgradeController) markUpgradeFailed(ctx context.Context, scope *orchestratedInPlaceUpgradeScope, failedM *clusterv1.Machine) error {
if err := inplace.MarkUpgradeFailed(ctx, scope.machineDeployment, scope.mdPatcher); err != nil {
return fmt.Errorf("failed to mark object with upgrade failed: %w", err)
}

r.recorder.Eventf(
scope.MachineDeployment,
scope.machineDeployment,
corev1.EventTypeWarning,
bootstrapv1.InPlaceUpgradeFailedEvent,
"In-place upgrade failed for machine %q.",
Expand All @@ -250,22 +206,27 @@ func (r *MachineDeploymentReconciler) markUpgradeFailed(ctx context.Context, sco
}

// createScope creates a new MachineDeploymentUpgradeScope.
func (r *MachineDeploymentReconciler) createScope(md *clusterv1.MachineDeployment, ownedMachines []*clusterv1.Machine) (*MachineDeploymentUpgradeScope, error) {
func (r *OrchestratedInPlaceUpgradeController) createScope(ctx context.Context, md *clusterv1.MachineDeployment) (*orchestratedInPlaceUpgradeScope, error) {
patchHelper, err := patch.NewHelper(md, r.Client)
if err != nil {
return nil, fmt.Errorf("failed to create new patch helper: %w", err)
}

return &MachineDeploymentUpgradeScope{
MachineDeployment: md,
UpgradeTo: r.getUpgradeInstructions(md),
OwnedMachines: ownedMachines,
PatchHelper: patchHelper,
ownedMachines, err := r.getOwnedMachines(ctx, md)
if err != nil {
return nil, fmt.Errorf("failed to get owned machines: %w", err)
}

return &orchestratedInPlaceUpgradeScope{
machineDeployment: md,
upgradeTo: inplace.GetUpgradeInstructions(md),
ownedMachines: ownedMachines,
mdPatcher: patchHelper,
}, nil
}

// getCluster gets the Cluster object for the MachineDeployment.
func (r *MachineDeploymentReconciler) getCluster(ctx context.Context, md *clusterv1.MachineDeployment) (*clusterv1.Cluster, error) {
func (r *OrchestratedInPlaceUpgradeController) getCluster(ctx context.Context, md *clusterv1.MachineDeployment) (*clusterv1.Cluster, error) {
cluster := &clusterv1.Cluster{}
clusterKey := client.ObjectKey{
Namespace: md.Namespace,
Expand All @@ -279,7 +240,7 @@ func (r *MachineDeploymentReconciler) getCluster(ctx context.Context, md *cluste
}

// getOwnedMachines gets the machines owned by the MachineDeployment.
func (r *MachineDeploymentReconciler) getOwnedMachines(ctx context.Context, md *clusterv1.MachineDeployment) ([]*clusterv1.Machine, error) {
func (r *OrchestratedInPlaceUpgradeController) getOwnedMachines(ctx context.Context, md *clusterv1.MachineDeployment) ([]*clusterv1.Machine, error) {
cluster, err := r.getCluster(ctx, md)
if err != nil {
return nil, fmt.Errorf("failed to get cluster: %w", err)
Expand Down Expand Up @@ -348,67 +309,25 @@ func (r *MachineDeploymentReconciler) getOwnedMachines(ctx context.Context, md *
return ownedMachines, nil
}

// isMachineUpgraded checks if the machine is already upgraded.
func (r *MachineDeploymentReconciler) isMachineUpgraded(scope *MachineDeploymentUpgradeScope, m *clusterv1.Machine) bool {
mUpgradeRelease := m.Annotations[bootstrapv1.InPlaceUpgradeReleaseAnnotation]
return mUpgradeRelease == scope.UpgradeTo
}

// isMachineUpgrading checks if the machine is upgrading.
func (r *MachineDeploymentReconciler) isMachineUpgrading(m *clusterv1.Machine) bool {
return m.Annotations[bootstrapv1.InPlaceUpgradeStatusAnnotation] == bootstrapv1.InPlaceUpgradeInProgressStatus ||
m.Annotations[bootstrapv1.InPlaceUpgradeToAnnotation] != ""
}

// isMachineUpgradeFailed checks if the machine upgrade failed.
func (r *MachineDeploymentReconciler) isMachineUpgradeFailed(m *clusterv1.Machine) bool {
return m.Annotations[bootstrapv1.InPlaceUpgradeLastFailedAttemptAtAnnotation] != ""
}

// markMachineToUpgrade marks the machine to upgrade.
func (r *MachineDeploymentReconciler) markMachineToUpgrade(ctx context.Context, scope *MachineDeploymentUpgradeScope, m *clusterv1.Machine) error {
patchHelper, err := patch.NewHelper(m, r.Client)
if err != nil {
return fmt.Errorf("failed to create new patch helper: %w", err)
}

if m.Annotations == nil {
m.Annotations = make(map[string]string)
}

// clean up
delete(m.Annotations, bootstrapv1.InPlaceUpgradeReleaseAnnotation)
delete(m.Annotations, bootstrapv1.InPlaceUpgradeStatusAnnotation)
delete(m.Annotations, bootstrapv1.InPlaceUpgradeChangeIDAnnotation)
delete(m.Annotations, bootstrapv1.InPlaceUpgradeLastFailedAttemptAtAnnotation)

m.Annotations[bootstrapv1.InPlaceUpgradeToAnnotation] = scope.UpgradeTo

if err := patchHelper.Patch(ctx, m); err != nil {
return fmt.Errorf("failed to patch: %w", err)
func (r *OrchestratedInPlaceUpgradeController) markMachineToUpgrade(ctx context.Context, scope *orchestratedInPlaceUpgradeScope, m *clusterv1.Machine) error {
if err := inplace.MarkMachineToUpgrade(ctx, m, scope.upgradeTo, r.Client); err != nil {
return fmt.Errorf("failed to mark machine to upgrade: %w", err)
}

r.recorder.Eventf(
scope.MachineDeployment,
scope.machineDeployment,
corev1.EventTypeNormal,
bootstrapv1.InPlaceUpgradeInProgressEvent,
"Machine %q is upgrading to %q",
m.Name,
scope.UpgradeTo,
scope.upgradeTo,
)

return nil
}

func (r *MachineDeploymentReconciler) getUpgradeInstructions(md *clusterv1.MachineDeployment) string {
// NOTE(Hue): The reason we are checking the `release` annotation as well is that we want to make sure
// we upgrade the new machines that joined after the initial upgrade process.
// The `upgrade-to` overwrites the `release` annotation, because we might have both in case
// the user decides to do another in-place upgrade after a successful one.
upgradeTo := md.Annotations[bootstrapv1.InPlaceUpgradeReleaseAnnotation]
if to, ok := md.Annotations[bootstrapv1.InPlaceUpgradeToAnnotation]; ok {
upgradeTo = to
}

return upgradeTo
// isDeleted returns true if the object is being deleted.
func isDeleted(obj client.Object) bool {
return !obj.GetDeletionTimestamp().IsZero()
}
6 changes: 3 additions & 3 deletions bootstrap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ func main() {
os.Exit(1)
}

if err = (&controllers.MachineDeploymentReconciler{
if err = (&controllers.OrchestratedInPlaceUpgradeController{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("MachineDeployment"),
Log: ctrl.Log.WithName("controllers").WithName("OrchestratedInPlaceUpgrade"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MachineDeployment")
setupLog.Error(err, "unable to create controller", "controller", "OrchestratedInPlaceUpgrade")
os.Exit(1)
}

Expand Down

0 comments on commit d8cc2dc

Please sign in to comment.