From 99a161361520d01d4ef613a513b07af58f78fe07 Mon Sep 17 00:00:00 2001 From: Andreas Sommer Date: Tue, 22 Oct 2024 11:10:35 +0200 Subject: [PATCH] Add AWSMachines to back the EC2 instances in AWSMachinePools and AWSManagedMachinePools Co-authored-by: Cameron McAvoy --- ...ture.cluster.x-k8s.io_awsmachinepools.yaml | 4 + ...uster.x-k8s.io_awsmanagedmachinepools.yaml | 4 + config/rbac/role.yaml | 9 + controllers/awsmachine_controller.go | 59 +++-- exp/api/v1beta1/conversion.go | 12 +- exp/api/v1beta1/zz_generated.conversion.go | 32 +-- exp/api/v1beta2/awsmachinepool_types.go | 4 + .../v1beta2/awsmanagedmachinepool_types.go | 4 + exp/api/v1beta2/conditions_consts.go | 5 + exp/api/v1beta2/types.go | 5 + exp/controllers/awsmachinepool_controller.go | 230 ++++++++++++++++-- .../awsmachinepool_controller_test.go | 189 ++++++++++++-- .../awsmanagedmachinepool_controller.go | 48 +++- pkg/cloud/awserrors/errors.go | 3 + pkg/cloud/scope/machine.go | 14 ++ pkg/cloud/services/ec2/instances.go | 2 +- 16 files changed, 544 insertions(+), 80 deletions(-) diff --git a/config/crd/bases/infrastructure.cluster.x-k8s.io_awsmachinepools.yaml b/config/crd/bases/infrastructure.cluster.x-k8s.io_awsmachinepools.yaml index e70f544535..e82a11b803 100644 --- a/config/crd/bases/infrastructure.cluster.x-k8s.io_awsmachinepools.yaml +++ b/config/crd/bases/infrastructure.cluster.x-k8s.io_awsmachinepools.yaml @@ -1159,6 +1159,10 @@ spec: can be added as events to the Machine object and/or logged in the controller's output. type: string + infrastructureMachineKind: + description: InfrastructureMachineKind is the kind of the infrastructure + resources behind MachinePool Machines. + type: string instances: description: Instances contains the status for each instance in the pool diff --git a/config/crd/bases/infrastructure.cluster.x-k8s.io_awsmanagedmachinepools.yaml b/config/crd/bases/infrastructure.cluster.x-k8s.io_awsmanagedmachinepools.yaml index 008bfd9d2e..6b80ef52c9 100644 --- a/config/crd/bases/infrastructure.cluster.x-k8s.io_awsmanagedmachinepools.yaml +++ b/config/crd/bases/infrastructure.cluster.x-k8s.io_awsmanagedmachinepools.yaml @@ -1087,6 +1087,10 @@ spec: can be added as events to the MachinePool object and/or logged in the controller's output. type: string + infrastructureMachineKind: + description: InfrastructureMachineKind is the kind of the infrastructure + resources behind MachinePool Machines. + type: string launchTemplateID: description: The ID of the launch template type: string diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 3ff4afe303..fdb81e86a3 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -126,6 +126,14 @@ rules: - cluster.x-k8s.io resources: - machines + verbs: + - delete + - get + - list + - watch +- apiGroups: + - cluster.x-k8s.io + resources: - machines/status verbs: - get @@ -310,6 +318,7 @@ rules: resources: - awsmachines verbs: + - create - delete - get - list diff --git a/controllers/awsmachine_controller.go b/controllers/awsmachine_controller.go index 7b2f94fd87..cc5e5a0166 100644 --- a/controllers/awsmachine_controller.go +++ b/controllers/awsmachine_controller.go @@ -143,10 +143,11 @@ func (r *AWSMachineReconciler) getObjectStoreService(scope scope.S3Scope) servic return s3.NewService(scope) } -// +kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=awsmachines,verbs=get;list;watch;update;patch;delete -// +kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=awsmachines/status,verbs=get;update;patch // +kubebuilder:rbac:groups=controlplane.cluster.x-k8s.io,resources=*,verbs=get;list;watch -// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=machines;machines/status,verbs=get;list;watch +// +kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=awsmachines,verbs=create;get;list;watch;update;patch;delete +// +kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=awsmachines/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=machines,verbs=get;list;watch;delete +// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=machines/status,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=secrets;,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=events,verbs=get;list;watch;create;update;patch @@ -459,6 +460,7 @@ func (r *AWSMachineReconciler) findInstance(machineScope *scope.MachineScope, ec return instance, nil } +//nolint:gocyclo func (r *AWSMachineReconciler) reconcileNormal(_ context.Context, machineScope *scope.MachineScope, clusterScope cloud.ClusterScoper, ec2Scope scope.EC2Scope, elbScope scope.ELBScope, objectStoreScope scope.S3Scope) (ctrl.Result, error) { machineScope.Trace("Reconciling AWSMachine") @@ -482,7 +484,7 @@ func (r *AWSMachineReconciler) reconcileNormal(_ context.Context, machineScope * } // Make sure bootstrap data is available and populated. - if machineScope.Machine.Spec.Bootstrap.DataSecretName == nil { + if !machineScope.IsMachinePoolMachine() && machineScope.Machine.Spec.Bootstrap.DataSecretName == nil { machineScope.Info("Bootstrap data secret reference is not yet available") conditions.MarkFalse(machineScope.AWSMachine, infrav1.InstanceReadyCondition, infrav1.WaitingForBootstrapDataReason, clusterv1.ConditionSeverityInfo, "") return ctrl.Result{}, nil @@ -497,6 +499,12 @@ func (r *AWSMachineReconciler) reconcileNormal(_ context.Context, machineScope * conditions.MarkUnknown(machineScope.AWSMachine, infrav1.InstanceReadyCondition, infrav1.InstanceNotFoundReason, err.Error()) return ctrl.Result{}, err } + if instance == nil && machineScope.IsMachinePoolMachine() { + err = errors.New("no instance found for machine pool") + machineScope.Error(err, "unable to find instance") + conditions.MarkUnknown(machineScope.AWSMachine, infrav1.InstanceReadyCondition, infrav1.InstanceNotFoundReason, err.Error()) + return ctrl.Result{}, err + } // If the AWSMachine doesn't have our finalizer, add it. if controllerutil.AddFinalizer(machineScope.AWSMachine, infrav1.MachineFinalizer) { @@ -586,9 +594,18 @@ func (r *AWSMachineReconciler) reconcileNormal(_ context.Context, machineScope * conditions.MarkTrue(machineScope.AWSMachine, infrav1.InstanceReadyCondition) case infrav1.InstanceStateShuttingDown, infrav1.InstanceStateTerminated: machineScope.SetNotReady() - machineScope.Info("Unexpected EC2 instance termination", "state", instance.State, "instance-id", *machineScope.GetInstanceID()) - r.Recorder.Eventf(machineScope.AWSMachine, corev1.EventTypeWarning, "InstanceUnexpectedTermination", "Unexpected EC2 instance termination") - conditions.MarkFalse(machineScope.AWSMachine, infrav1.InstanceReadyCondition, infrav1.InstanceTerminatedReason, clusterv1.ConditionSeverityError, "") + + if machineScope.IsMachinePoolMachine() { + // In an auto-scaling group, instance termination is perfectly normal on scale-down + // and therefore should not be reported as error. + machineScope.Info("EC2 instance of machine pool was terminated", "state", instance.State, "instance-id", *machineScope.GetInstanceID()) + r.Recorder.Eventf(machineScope.AWSMachine, corev1.EventTypeNormal, infrav1.InstanceTerminatedReason, "EC2 instance termination") + conditions.MarkFalse(machineScope.AWSMachine, infrav1.InstanceReadyCondition, infrav1.InstanceTerminatedReason, clusterv1.ConditionSeverityInfo, "") + } else { + machineScope.Info("Unexpected EC2 instance termination", "state", instance.State, "instance-id", *machineScope.GetInstanceID()) + r.Recorder.Eventf(machineScope.AWSMachine, corev1.EventTypeWarning, "InstanceUnexpectedTermination", "Unexpected EC2 instance termination") + conditions.MarkFalse(machineScope.AWSMachine, infrav1.InstanceReadyCondition, infrav1.InstanceTerminatedReason, clusterv1.ConditionSeverityError, "") + } default: machineScope.SetNotReady() machineScope.Info("EC2 instance state is undefined", "state", instance.State, "instance-id", *machineScope.GetInstanceID()) @@ -599,14 +616,18 @@ func (r *AWSMachineReconciler) reconcileNormal(_ context.Context, machineScope * } // reconcile the deletion of the bootstrap data secret now that we have updated instance state - if deleteSecretErr := r.deleteBootstrapData(machineScope, clusterScope, objectStoreScope); deleteSecretErr != nil { - r.Log.Error(deleteSecretErr, "unable to delete secrets") - return ctrl.Result{}, deleteSecretErr - } + if !machineScope.IsMachinePoolMachine() { + if deleteSecretErr := r.deleteBootstrapData(machineScope, clusterScope, objectStoreScope); deleteSecretErr != nil { + r.Log.Error(deleteSecretErr, "unable to delete secrets") + return ctrl.Result{}, deleteSecretErr + } - if instance.State == infrav1.InstanceStateTerminated { - machineScope.SetFailureReason(capierrors.UpdateMachineError) - machineScope.SetFailureMessage(errors.Errorf("EC2 instance state %q is unexpected", instance.State)) + // For machine pool machines, it is expected that the ASG terminates instances at any time, + // so no error is logged for those. + if instance.State == infrav1.InstanceStateTerminated { + machineScope.SetFailureReason(capierrors.UpdateMachineError) + machineScope.SetFailureMessage(errors.Errorf("EC2 instance state %q is unexpected", instance.State)) + } } // tasks that can take place during all known instance states @@ -876,9 +897,13 @@ func getIgnitionVersion(scope *scope.MachineScope) string { } func (r *AWSMachineReconciler) deleteBootstrapData(machineScope *scope.MachineScope, clusterScope cloud.ClusterScoper, objectStoreScope scope.S3Scope) error { - _, userDataFormat, err := machineScope.GetRawBootstrapDataWithFormat() - if client.IgnoreNotFound(err) != nil { - return errors.Wrap(err, "failed to get raw userdata") + var userDataFormat string + var err error + if machineScope.Machine.Spec.Bootstrap.DataSecretName != nil { + _, userDataFormat, err = machineScope.GetRawBootstrapDataWithFormat() + if client.IgnoreNotFound(err) != nil { + return errors.Wrap(err, "failed to get raw userdata") + } } if machineScope.UseSecretsManager(userDataFormat) { diff --git a/exp/api/v1beta1/conversion.go b/exp/api/v1beta1/conversion.go index 7c39f1fcbd..db6187c311 100644 --- a/exp/api/v1beta1/conversion.go +++ b/exp/api/v1beta1/conversion.go @@ -52,6 +52,7 @@ func (src *AWSMachinePool) ConvertTo(dstRaw conversion.Hub) error { if restored.Spec.AvailabilityZoneSubnetType != nil { dst.Spec.AvailabilityZoneSubnetType = restored.Spec.AvailabilityZoneSubnetType } + dst.Status.InfrastructureMachineKind = restored.Status.InfrastructureMachineKind if restored.Spec.AWSLaunchTemplate.PrivateDNSName != nil { dst.Spec.AWSLaunchTemplate.PrivateDNSName = restored.Spec.AWSLaunchTemplate.PrivateDNSName @@ -83,7 +84,6 @@ func (src *AWSMachinePoolList) ConvertTo(dstRaw conversion.Hub) error { // ConvertFrom converts the v1beta2 AWSMachinePoolList receiver to v1beta1 AWSMachinePoolList. func (r *AWSMachinePoolList) ConvertFrom(srcRaw conversion.Hub) error { src := srcRaw.(*infrav1exp.AWSMachinePoolList) - return Convert_v1beta2_AWSMachinePoolList_To_v1beta1_AWSMachinePoolList(src, r, nil) } @@ -114,6 +114,8 @@ func (src *AWSManagedMachinePool) ConvertTo(dstRaw conversion.Hub) error { dst.Spec.AvailabilityZoneSubnetType = restored.Spec.AvailabilityZoneSubnetType } + dst.Status.InfrastructureMachineKind = restored.Status.InfrastructureMachineKind + return nil } @@ -133,6 +135,14 @@ func Convert_v1beta2_AWSManagedMachinePoolSpec_To_v1beta1_AWSManagedMachinePoolS return autoConvert_v1beta2_AWSManagedMachinePoolSpec_To_v1beta1_AWSManagedMachinePoolSpec(in, out, s) } +func Convert_v1beta2_AWSMachinePoolStatus_To_v1beta1_AWSMachinePoolStatus(in *infrav1exp.AWSMachinePoolStatus, out *AWSMachinePoolStatus, s apiconversion.Scope) error { + return autoConvert_v1beta2_AWSMachinePoolStatus_To_v1beta1_AWSMachinePoolStatus(in, out, s) +} + +func Convert_v1beta2_AWSManagedMachinePoolStatus_To_v1beta1_AWSManagedMachinePoolStatus(in *infrav1exp.AWSManagedMachinePoolStatus, out *AWSManagedMachinePoolStatus, s apiconversion.Scope) error { + return autoConvert_v1beta2_AWSManagedMachinePoolStatus_To_v1beta1_AWSManagedMachinePoolStatus(in, out, s) +} + // ConvertTo converts the v1beta1 AWSManagedMachinePoolList receiver to a v1beta2 AWSManagedMachinePoolList. func (src *AWSManagedMachinePoolList) ConvertTo(dstRaw conversion.Hub) error { dst := dstRaw.(*infrav1exp.AWSManagedMachinePoolList) diff --git a/exp/api/v1beta1/zz_generated.conversion.go b/exp/api/v1beta1/zz_generated.conversion.go index 585cbd1504..34bc6e0cb4 100644 --- a/exp/api/v1beta1/zz_generated.conversion.go +++ b/exp/api/v1beta1/zz_generated.conversion.go @@ -100,11 +100,6 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } - if err := s.AddGeneratedConversionFunc((*v1beta2.AWSMachinePoolStatus)(nil), (*AWSMachinePoolStatus)(nil), func(a, b interface{}, scope conversion.Scope) error { - return Convert_v1beta2_AWSMachinePoolStatus_To_v1beta1_AWSMachinePoolStatus(a.(*v1beta2.AWSMachinePoolStatus), b.(*AWSMachinePoolStatus), scope) - }); err != nil { - return err - } if err := s.AddGeneratedConversionFunc((*AWSManagedMachinePool)(nil), (*v1beta2.AWSManagedMachinePool)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1beta1_AWSManagedMachinePool_To_v1beta2_AWSManagedMachinePool(a.(*AWSManagedMachinePool), b.(*v1beta2.AWSManagedMachinePool), scope) }); err != nil { @@ -135,11 +130,6 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } - if err := s.AddGeneratedConversionFunc((*v1beta2.AWSManagedMachinePoolStatus)(nil), (*AWSManagedMachinePoolStatus)(nil), func(a, b interface{}, scope conversion.Scope) error { - return Convert_v1beta2_AWSManagedMachinePoolStatus_To_v1beta1_AWSManagedMachinePoolStatus(a.(*v1beta2.AWSManagedMachinePoolStatus), b.(*AWSManagedMachinePoolStatus), scope) - }); err != nil { - return err - } if err := s.AddGeneratedConversionFunc((*BlockDeviceMapping)(nil), (*v1beta2.BlockDeviceMapping)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1beta1_BlockDeviceMapping_To_v1beta2_BlockDeviceMapping(a.(*BlockDeviceMapping), b.(*v1beta2.BlockDeviceMapping), scope) }); err != nil { @@ -300,11 +290,21 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddConversionFunc((*v1beta2.AWSMachinePoolStatus)(nil), (*AWSMachinePoolStatus)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1beta2_AWSMachinePoolStatus_To_v1beta1_AWSMachinePoolStatus(a.(*v1beta2.AWSMachinePoolStatus), b.(*AWSMachinePoolStatus), scope) + }); err != nil { + return err + } if err := s.AddConversionFunc((*v1beta2.AWSManagedMachinePoolSpec)(nil), (*AWSManagedMachinePoolSpec)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1beta2_AWSManagedMachinePoolSpec_To_v1beta1_AWSManagedMachinePoolSpec(a.(*v1beta2.AWSManagedMachinePoolSpec), b.(*AWSManagedMachinePoolSpec), scope) }); err != nil { return err } + if err := s.AddConversionFunc((*v1beta2.AWSManagedMachinePoolStatus)(nil), (*AWSManagedMachinePoolStatus)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1beta2_AWSManagedMachinePoolStatus_To_v1beta1_AWSManagedMachinePoolStatus(a.(*v1beta2.AWSManagedMachinePoolStatus), b.(*AWSManagedMachinePoolStatus), scope) + }); err != nil { + return err + } if err := s.AddConversionFunc((*v1beta2.AutoScalingGroup)(nil), (*AutoScalingGroup)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1beta2_AutoScalingGroup_To_v1beta1_AutoScalingGroup(a.(*v1beta2.AutoScalingGroup), b.(*AutoScalingGroup), scope) }); err != nil { @@ -593,17 +593,13 @@ func autoConvert_v1beta2_AWSMachinePoolStatus_To_v1beta1_AWSMachinePoolStatus(in out.Instances = *(*[]AWSMachinePoolInstanceStatus)(unsafe.Pointer(&in.Instances)) out.LaunchTemplateID = in.LaunchTemplateID out.LaunchTemplateVersion = (*string)(unsafe.Pointer(in.LaunchTemplateVersion)) + // WARNING: in.InfrastructureMachineKind requires manual conversion: does not exist in peer-type out.FailureReason = (*errors.MachineStatusError)(unsafe.Pointer(in.FailureReason)) out.FailureMessage = (*string)(unsafe.Pointer(in.FailureMessage)) out.ASGStatus = (*ASGStatus)(unsafe.Pointer(in.ASGStatus)) return nil } -// Convert_v1beta2_AWSMachinePoolStatus_To_v1beta1_AWSMachinePoolStatus is an autogenerated conversion function. -func Convert_v1beta2_AWSMachinePoolStatus_To_v1beta1_AWSMachinePoolStatus(in *v1beta2.AWSMachinePoolStatus, out *AWSMachinePoolStatus, s conversion.Scope) error { - return autoConvert_v1beta2_AWSMachinePoolStatus_To_v1beta1_AWSMachinePoolStatus(in, out, s) -} - func autoConvert_v1beta1_AWSManagedMachinePool_To_v1beta2_AWSManagedMachinePool(in *AWSManagedMachinePool, out *v1beta2.AWSManagedMachinePool, s conversion.Scope) error { out.ObjectMeta = in.ObjectMeta if err := Convert_v1beta1_AWSManagedMachinePoolSpec_To_v1beta2_AWSManagedMachinePoolSpec(&in.Spec, &out.Spec, s); err != nil { @@ -765,17 +761,13 @@ func autoConvert_v1beta2_AWSManagedMachinePoolStatus_To_v1beta1_AWSManagedMachin out.Replicas = in.Replicas out.LaunchTemplateID = (*string)(unsafe.Pointer(in.LaunchTemplateID)) out.LaunchTemplateVersion = (*string)(unsafe.Pointer(in.LaunchTemplateVersion)) + // WARNING: in.InfrastructureMachineKind requires manual conversion: does not exist in peer-type out.FailureReason = (*errors.MachineStatusError)(unsafe.Pointer(in.FailureReason)) out.FailureMessage = (*string)(unsafe.Pointer(in.FailureMessage)) out.Conditions = *(*clusterapiapiv1beta1.Conditions)(unsafe.Pointer(&in.Conditions)) return nil } -// Convert_v1beta2_AWSManagedMachinePoolStatus_To_v1beta1_AWSManagedMachinePoolStatus is an autogenerated conversion function. -func Convert_v1beta2_AWSManagedMachinePoolStatus_To_v1beta1_AWSManagedMachinePoolStatus(in *v1beta2.AWSManagedMachinePoolStatus, out *AWSManagedMachinePoolStatus, s conversion.Scope) error { - return autoConvert_v1beta2_AWSManagedMachinePoolStatus_To_v1beta1_AWSManagedMachinePoolStatus(in, out, s) -} - func autoConvert_v1beta1_AutoScalingGroup_To_v1beta2_AutoScalingGroup(in *AutoScalingGroup, out *v1beta2.AutoScalingGroup, s conversion.Scope) error { out.ID = in.ID out.Tags = *(*apiv1beta2.Tags)(unsafe.Pointer(&in.Tags)) diff --git a/exp/api/v1beta2/awsmachinepool_types.go b/exp/api/v1beta2/awsmachinepool_types.go index 526876bcfd..ee341102bc 100644 --- a/exp/api/v1beta2/awsmachinepool_types.go +++ b/exp/api/v1beta2/awsmachinepool_types.go @@ -210,6 +210,10 @@ type AWSMachinePoolStatus struct { // +optional LaunchTemplateVersion *string `json:"launchTemplateVersion,omitempty"` + // InfrastructureMachineKind is the kind of the infrastructure resources behind MachinePool Machines. + // +optional + InfrastructureMachineKind string `json:"infrastructureMachineKind,omitempty"` + // FailureReason will be set in the event that there is a terminal problem // reconciling the Machine and will contain a succinct value suitable // for machine interpretation. diff --git a/exp/api/v1beta2/awsmanagedmachinepool_types.go b/exp/api/v1beta2/awsmanagedmachinepool_types.go index c7e70fcf55..e1e2dfc102 100644 --- a/exp/api/v1beta2/awsmanagedmachinepool_types.go +++ b/exp/api/v1beta2/awsmanagedmachinepool_types.go @@ -199,6 +199,10 @@ type AWSManagedMachinePoolStatus struct { // +optional LaunchTemplateVersion *string `json:"launchTemplateVersion,omitempty"` + // InfrastructureMachineKind is the kind of the infrastructure resources behind MachinePool Machines. + // +optional + InfrastructureMachineKind string `json:"infrastructureMachineKind,omitempty"` + // FailureReason will be set in the event that there is a terminal problem // reconciling the MachinePool and will contain a succinct value suitable // for machine interpretation. diff --git a/exp/api/v1beta2/conditions_consts.go b/exp/api/v1beta2/conditions_consts.go index 2d052fae53..d2824bb470 100644 --- a/exp/api/v1beta2/conditions_consts.go +++ b/exp/api/v1beta2/conditions_consts.go @@ -54,6 +54,11 @@ const ( InstanceRefreshNotReadyReason = "InstanceRefreshNotReady" // InstanceRefreshFailedReason used to report when there instance refresh is not initiated. InstanceRefreshFailedReason = "InstanceRefreshFailed" + + // AWSMachineCreationFailed reports if creating AWSMachines to represent ASG (machine pool) machines failed. + AWSMachineCreationFailed = "AWSMachineCreationFailed" + // AWSMachineDeletionFailed reports if deleting AWSMachines failed. + AWSMachineDeletionFailed = "AWSMachineDeletionFailed" ) const ( diff --git a/exp/api/v1beta2/types.go b/exp/api/v1beta2/types.go index 0bc4009a2e..fef54d1339 100644 --- a/exp/api/v1beta2/types.go +++ b/exp/api/v1beta2/types.go @@ -22,6 +22,11 @@ import ( infrav1 "sigs.k8s.io/cluster-api-provider-aws/v2/api/v1beta2" ) +const ( + // KindMachinePool is a MachinePool resource Kind + KindMachinePool string = "MachinePool" +) + // EBS can be used to automatically set up EBS volumes when an instance is launched. type EBS struct { // Encrypted is whether the volume should be encrypted or not. diff --git a/exp/controllers/awsmachinepool_controller.go b/exp/controllers/awsmachinepool_controller.go index 741cdcdb10..e00b75ad21 100644 --- a/exp/controllers/awsmachinepool_controller.go +++ b/exp/controllers/awsmachinepool_controller.go @@ -20,7 +20,10 @@ package controllers import ( "context" "fmt" + "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/go-logr/logr" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/pkg/errors" @@ -172,19 +175,25 @@ func (r *AWSMachinePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reque } }() + // Patch now so that the status and selectors are available. + awsMachinePool.Status.InfrastructureMachineKind = "AWSMachine" + if err := machinePoolScope.PatchObject(); err != nil { + return ctrl.Result{}, errors.Wrap(err, "failed to patch AWSMachinePool status") + } + switch infraScope := infraCluster.(type) { case *scope.ManagedControlPlaneScope: if !awsMachinePool.ObjectMeta.DeletionTimestamp.IsZero() { - return ctrl.Result{}, r.reconcileDelete(machinePoolScope, infraScope, infraScope) + return ctrl.Result{}, r.reconcileDelete(ctx, machinePoolScope, infraScope, infraScope) } - return ctrl.Result{}, r.reconcileNormal(ctx, machinePoolScope, infraScope, infraScope) + return r.reconcileNormal(ctx, machinePoolScope, infraScope, infraScope) case *scope.ClusterScope: if !awsMachinePool.ObjectMeta.DeletionTimestamp.IsZero() { - return ctrl.Result{}, r.reconcileDelete(machinePoolScope, infraScope, infraScope) + return ctrl.Result{}, r.reconcileDelete(ctx, machinePoolScope, infraScope, infraScope) } - return ctrl.Result{}, r.reconcileNormal(ctx, machinePoolScope, infraScope, infraScope) + return r.reconcileNormal(ctx, machinePoolScope, infraScope, infraScope) default: return ctrl.Result{}, errors.New("infraCluster has unknown type") } @@ -202,7 +211,7 @@ func (r *AWSMachinePoolReconciler) SetupWithManager(ctx context.Context, mgr ctr Complete(r) } -func (r *AWSMachinePoolReconciler) reconcileNormal(ctx context.Context, machinePoolScope *scope.MachinePoolScope, clusterScope cloud.ClusterScoper, ec2Scope scope.EC2Scope) error { +func (r *AWSMachinePoolReconciler) reconcileNormal(ctx context.Context, machinePoolScope *scope.MachinePoolScope, clusterScope cloud.ClusterScoper, ec2Scope scope.EC2Scope) (ctrl.Result, error) { clusterScope.Info("Reconciling AWSMachinePool") // If the AWSMachine is in an error state, return early. @@ -211,28 +220,28 @@ func (r *AWSMachinePoolReconciler) reconcileNormal(ctx context.Context, machineP // TODO: If we are in a failed state, delete the secret regardless of instance state - return nil + return ctrl.Result{}, nil } // If the AWSMachinepool doesn't have our finalizer, add it if controllerutil.AddFinalizer(machinePoolScope.AWSMachinePool, expinfrav1.MachinePoolFinalizer) { // Register finalizer immediately to avoid orphaning AWS resources if err := machinePoolScope.PatchObject(); err != nil { - return err + return ctrl.Result{}, err } } if !machinePoolScope.Cluster.Status.InfrastructureReady { machinePoolScope.Info("Cluster infrastructure is not ready yet") conditions.MarkFalse(machinePoolScope.AWSMachinePool, expinfrav1.ASGReadyCondition, infrav1.WaitingForClusterInfrastructureReason, clusterv1.ConditionSeverityInfo, "") - return nil + return ctrl.Result{}, nil } // Make sure bootstrap data is available and populated if machinePoolScope.MachinePool.Spec.Template.Spec.Bootstrap.DataSecretName == nil { machinePoolScope.Info("Bootstrap data secret reference is not yet available") conditions.MarkFalse(machinePoolScope.AWSMachinePool, expinfrav1.ASGReadyCondition, infrav1.WaitingForBootstrapDataReason, clusterv1.ConditionSeverityInfo, "") - return nil + return ctrl.Result{}, nil } ec2Svc := r.getEC2Service(ec2Scope) @@ -243,7 +252,7 @@ func (r *AWSMachinePoolReconciler) reconcileNormal(ctx context.Context, machineP asg, err := r.findASG(machinePoolScope, asgsvc) if err != nil { conditions.MarkUnknown(machinePoolScope.AWSMachinePool, expinfrav1.ASGReadyCondition, expinfrav1.ASGNotFoundReason, err.Error()) - return err + return ctrl.Result{}, err } canUpdateLaunchTemplate := func() (bool, error) { @@ -283,7 +292,7 @@ func (r *AWSMachinePoolReconciler) reconcileNormal(ctx context.Context, machineP if err := reconSvc.ReconcileLaunchTemplate(machinePoolScope, ec2Svc, canUpdateLaunchTemplate, runPostLaunchTemplateUpdateOperation); err != nil { r.Recorder.Eventf(machinePoolScope.AWSMachinePool, corev1.EventTypeWarning, "FailedLaunchTemplateReconcile", "Failed to reconcile launch template: %v", err) machinePoolScope.Error(err, "failed to reconcile launch template") - return err + return ctrl.Result{}, err } // set the LaunchTemplateReady condition @@ -293,9 +302,28 @@ func (r *AWSMachinePoolReconciler) reconcileNormal(ctx context.Context, machineP // Create new ASG if err := r.createPool(machinePoolScope, clusterScope); err != nil { conditions.MarkFalse(machinePoolScope.AWSMachinePool, expinfrav1.ASGReadyCondition, expinfrav1.ASGProvisionFailedReason, clusterv1.ConditionSeverityError, err.Error()) - return err + return ctrl.Result{}, err } - return nil + return ctrl.Result{ + RequeueAfter: 15 * time.Second, + }, nil + } + + awsMachineList, err := getAWSMachines(ctx, machinePoolScope.MachinePool, r.Client) + if err != nil { + return ctrl.Result{}, err + } + + if err := createAWSMachinesIfNotExists(ctx, awsMachineList, machinePoolScope.MachinePool, asg, machinePoolScope.GetLogger(), r.Client, ec2Svc); err != nil { + machinePoolScope.SetNotReady() + conditions.MarkFalse(machinePoolScope.AWSMachinePool, clusterv1.ReadyCondition, expinfrav1.AWSMachineCreationFailed, clusterv1.ConditionSeverityWarning, "%s", err.Error()) + return ctrl.Result{}, fmt.Errorf("failed to create awsmachines: %w", err) + } + + if err := deleteOrphanedAWSMachines(ctx, awsMachineList, asg, machinePoolScope.GetLogger(), r.Client); err != nil { + machinePoolScope.SetNotReady() + conditions.MarkFalse(machinePoolScope.AWSMachinePool, clusterv1.ReadyCondition, expinfrav1.AWSMachineDeletionFailed, clusterv1.ConditionSeverityWarning, "%s", err.Error()) + return ctrl.Result{}, fmt.Errorf("failed to clean up awsmachines: %w", err) } if annotations.ReplicasManagedByExternalAutoscaler(machinePoolScope.MachinePool) { @@ -306,14 +334,14 @@ func (r *AWSMachinePoolReconciler) reconcileNormal(ctx context.Context, machineP "external", asg.DesiredCapacity) machinePoolScope.MachinePool.Spec.Replicas = asg.DesiredCapacity if err := machinePoolScope.PatchCAPIMachinePoolObject(ctx); err != nil { - return err + return ctrl.Result{}, err } } } if err := r.updatePool(machinePoolScope, clusterScope, asg); err != nil { machinePoolScope.Error(err, "error updating AWSMachinePool") - return err + return ctrl.Result{}, err } launchTemplateID := machinePoolScope.GetLaunchTemplateIDStatus() @@ -330,7 +358,7 @@ func (r *AWSMachinePoolReconciler) reconcileNormal(ctx context.Context, machineP } err = reconSvc.ReconcileTags(machinePoolScope, resourceServiceToUpdate) if err != nil { - return errors.Wrap(err, "error updating tags") + return ctrl.Result{}, errors.Wrap(err, "error updating tags") } // Make sure Spec.ProviderID is always set. @@ -353,11 +381,19 @@ func (r *AWSMachinePoolReconciler) reconcileNormal(ctx context.Context, machineP machinePoolScope.Error(err, "failed updating instances", "instances", asg.Instances) } - return nil + return ctrl.Result{ + // Regularly update `AWSMachine` objects, for example if ASG was scaled or refreshed instances + // TODO: Requeueing interval can be removed or prolonged once reconciliation of ASG EC2 instances + // can be triggered by events. + RequeueAfter: 3 * time.Minute, + }, nil } -func (r *AWSMachinePoolReconciler) reconcileDelete(machinePoolScope *scope.MachinePoolScope, clusterScope cloud.ClusterScoper, ec2Scope scope.EC2Scope) error { +func (r *AWSMachinePoolReconciler) reconcileDelete(ctx context.Context, machinePoolScope *scope.MachinePoolScope, clusterScope cloud.ClusterScoper, ec2Scope scope.EC2Scope) error { clusterScope.Info("Handling deleted AWSMachinePool") + if err := reconcileDeleteAWSMachines(ctx, machinePoolScope.MachinePool, r.Client, machinePoolScope.GetLogger()); err != nil { + return err + } ec2Svc := r.getEC2Service(ec2Scope) asgSvc := r.getASGService(clusterScope) @@ -415,6 +451,164 @@ func (r *AWSMachinePoolReconciler) reconcileDelete(machinePoolScope *scope.Machi return nil } +func reconcileDeleteAWSMachines(ctx context.Context, mp *expclusterv1.MachinePool, client client.Client, l logr.Logger) error { + awsMachineList, err := getAWSMachines(ctx, mp, client) + if err != nil { + return err + } + for i := range awsMachineList.Items { + awsMachine := awsMachineList.Items[i] + if awsMachine.DeletionTimestamp.IsZero() { + continue + } + logger := l.WithValues("awsmachine", klog.KObj(&awsMachine)) + // delete the owner Machine resource for the AWSMachine so that CAPI can clean up gracefully + machine, err := util.GetOwnerMachine(ctx, client, awsMachine.ObjectMeta) + if err != nil { + logger.V(2).Info("Failed to get owner Machine", "err", err.Error()) + continue + } + + if err := client.Delete(ctx, machine); err != nil { + logger.V(2).Info("Failed to delete owner Machine", "err", err.Error()) + } + } + return nil +} + +func getAWSMachines(ctx context.Context, mp *expclusterv1.MachinePool, kubeClient client.Client) (*infrav1.AWSMachineList, error) { + awsMachineList := &infrav1.AWSMachineList{} + labels := map[string]string{ + clusterv1.MachinePoolNameLabel: mp.Name, + clusterv1.ClusterNameLabel: mp.Spec.ClusterName, + } + if err := kubeClient.List(ctx, awsMachineList, client.InNamespace(mp.Namespace), client.MatchingLabels(labels)); err != nil { + return nil, err + } + return awsMachineList, nil +} + +func createAWSMachinesIfNotExists(ctx context.Context, awsMachineList *infrav1.AWSMachineList, mp *expclusterv1.MachinePool, existingASG *expinfrav1.AutoScalingGroup, l logr.Logger, client client.Client, ec2Svc services.EC2Interface) error { + l.V(4).Info("Creating missing AWSMachines") + + providerIDToAWSMachine := make(map[string]infrav1.AWSMachine, len(awsMachineList.Items)) + for i := range awsMachineList.Items { + awsMachine := awsMachineList.Items[i] + if awsMachine.Spec.ProviderID == nil || *awsMachine.Spec.ProviderID == "" { + continue + } + providerID := *awsMachine.Spec.ProviderID + providerIDToAWSMachine[providerID] = awsMachine + } + + for i := range existingASG.Instances { + instanceID := existingASG.Instances[i].ID + providerID := fmt.Sprintf("aws:///%s/%s", existingASG.Instances[i].AvailabilityZone, instanceID) + + instanceLogger := l.WithValues("providerID", providerID, "instanceID", instanceID, "asg", existingASG.Name) + instanceLogger.V(4).Info("Checking if machine pool AWSMachine is up to date") + if _, exists := providerIDToAWSMachine[providerID]; exists { + continue + } + + instance, err := ec2Svc.InstanceIfExists(&instanceID) + if errors.Is(err, ec2.ErrInstanceNotFoundByID) { + instanceLogger.V(4).Info("Instance not found, it may have already been deleted") + continue + } + if err != nil { + return fmt.Errorf("Failed to look up EC2 instance %q: %w", instanceID, err) + } + + securityGroups := make([]infrav1.AWSResourceReference, 0, len(instance.SecurityGroupIDs)) + for j := range instance.SecurityGroupIDs { + securityGroups = append(securityGroups, infrav1.AWSResourceReference{ + ID: aws.String(instance.SecurityGroupIDs[j]), + }) + } + + awsMachine := &infrav1.AWSMachine{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: mp.Namespace, + GenerateName: fmt.Sprintf("%s-", existingASG.Name), + Labels: map[string]string{ + clusterv1.MachinePoolNameLabel: mp.Name, + clusterv1.ClusterNameLabel: mp.Spec.ClusterName, + }, + }, + Spec: infrav1.AWSMachineSpec{ + ProviderID: aws.String(providerID), + InstanceID: aws.String(instanceID), + + // Store some extra fields for informational purposes (not needed by CAPA) + AMI: infrav1.AMIReference{ + ID: aws.String(instance.ImageID), + }, + InstanceType: instance.Type, + PublicIP: aws.Bool(instance.PublicIP != nil), + SSHKeyName: instance.SSHKeyName, + InstanceMetadataOptions: instance.InstanceMetadataOptions, + IAMInstanceProfile: instance.IAMProfile, + AdditionalSecurityGroups: securityGroups, + Subnet: &infrav1.AWSResourceReference{ID: aws.String(instance.SubnetID)}, + RootVolume: instance.RootVolume, + NonRootVolumes: instance.NonRootVolumes, + NetworkInterfaces: instance.NetworkInterfaces, + CloudInit: infrav1.CloudInit{}, + SpotMarketOptions: instance.SpotMarketOptions, + Tenancy: instance.Tenancy, + }, + } + instanceLogger.V(4).Info("Creating AWSMachine") + if err := client.Create(ctx, awsMachine); err != nil { + return fmt.Errorf("failed to create AWSMachine: %w", err) + } + } + return nil +} + +func deleteOrphanedAWSMachines(ctx context.Context, awsMachineList *infrav1.AWSMachineList, existingASG *expinfrav1.AutoScalingGroup, l logr.Logger, client client.Client) error { + l.V(4).Info("Deleting orphaned AWSMachines") + providerIDToInstance := make(map[string]infrav1.Instance, len(existingASG.Instances)) + for i := range existingASG.Instances { + providerID := fmt.Sprintf("aws:///%s/%s", existingASG.Instances[i].AvailabilityZone, existingASG.Instances[i].ID) + providerIDToInstance[providerID] = existingASG.Instances[i] + } + + for i := range awsMachineList.Items { + awsMachine := awsMachineList.Items[i] + if awsMachine.Spec.ProviderID == nil || *awsMachine.Spec.ProviderID == "" { + continue + } + + providerID := *awsMachine.Spec.ProviderID + if _, exists := providerIDToInstance[providerID]; exists { + continue + } + + machine, err := util.GetOwnerMachine(ctx, client, awsMachine.ObjectMeta) + if err != nil { + return fmt.Errorf("failed to get owner Machine for %s/%s: %w", awsMachine.Namespace, awsMachine.Name, err) + } + machineLogger := l.WithValues("machine", klog.KObj(machine), "awsmachine", klog.KObj(&awsMachine), "ProviderID", providerID) + machineLogger.V(4).Info("Deleting orphaned Machine") + if machine == nil { + machineLogger.Info("No machine owner found for AWSMachine, deleting AWSMachine anyway.") + if err := client.Delete(ctx, &awsMachine); err != nil { + return fmt.Errorf("failed to delete orphan AWSMachine %s/%s: %w", awsMachine.Namespace, awsMachine.Name, err) + } + machineLogger.V(4).Info("Deleted AWSMachine") + continue + } + + if err := client.Delete(ctx, machine); err != nil { + return fmt.Errorf("failed to delete orphan Machine %s/%s: %w", machine.Namespace, machine.Name, err) + } + machineLogger.V(4).Info("Deleted Machine") + } + return nil +} + func (r *AWSMachinePoolReconciler) updatePool(machinePoolScope *scope.MachinePoolScope, clusterScope cloud.ClusterScoper, existingASG *expinfrav1.AutoScalingGroup) error { asgSvc := r.getASGService(clusterScope) diff --git a/exp/controllers/awsmachinepool_controller_test.go b/exp/controllers/awsmachinepool_controller_test.go index 4902dbb7e7..8c69346a61 100644 --- a/exp/controllers/awsmachinepool_controller_test.go +++ b/exp/controllers/awsmachinepool_controller_test.go @@ -35,6 +35,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" infrav1 "sigs.k8s.io/cluster-api-provider-aws/v2/api/v1beta2" @@ -101,6 +102,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { }, }, }, + Status: expinfrav1.AWSMachinePoolStatus{}, } secret = &corev1.Secret{ @@ -135,6 +137,11 @@ func TestAWSMachinePoolReconciler(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "mp", Namespace: "default", + UID: "1", + }, + TypeMeta: metav1.TypeMeta{ + APIVersion: "cluster.x-k8s.io/v1beta1", + Kind: "MachinePool", }, Spec: expclusterv1.MachinePoolSpec{ ClusterName: "test", @@ -173,6 +180,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { return reconSvc }, Recorder: recorder, + Client: testEnv.Client, } } @@ -211,7 +219,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { buf := new(bytes.Buffer) klog.SetOutput(buf) - _ = reconciler.reconcileNormal(context.Background(), ms, cs, cs) + _, _ = reconciler.reconcileNormal(context.Background(), ms, cs, cs) g.Expect(buf).To(ContainSubstring("Error state detected, skipping reconciliation")) }) t.Run("should add our finalizer to the machinepool", func(t *testing.T) { @@ -220,7 +228,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { defer teardown(t, g) getASG(t, g) - _ = reconciler.reconcileNormal(context.Background(), ms, cs, cs) + _, _ = reconciler.reconcileNormal(context.Background(), ms, cs, cs) g.Expect(ms.AWSMachinePool.Finalizers).To(ContainElement(expinfrav1.MachinePoolFinalizer)) }) @@ -235,7 +243,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { buf := new(bytes.Buffer) klog.SetOutput(buf) - err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) + _, err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) g.Expect(err).To(BeNil()) g.Expect(buf.String()).To(ContainSubstring("Cluster infrastructure is not ready yet")) expectConditions(g, ms.AWSMachinePool, []conditionAssertion{{expinfrav1.ASGReadyCondition, corev1.ConditionFalse, clusterv1.ConditionSeverityInfo, infrav1.WaitingForClusterInfrastructureReason}}) @@ -250,7 +258,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { buf := new(bytes.Buffer) klog.SetOutput(buf) - err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) + _, err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) g.Expect(err).To(BeNil()) g.Expect(buf.String()).To(ContainSubstring("Bootstrap data secret reference is not yet available")) @@ -278,10 +286,145 @@ func TestAWSMachinePoolReconciler(t *testing.T) { expectedErr := errors.New("no connection available ") reconSvc.EXPECT().ReconcileLaunchTemplate(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(expectedErr) - err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) + _, err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) g.Expect(errors.Cause(err)).To(MatchError(expectedErr)) }) }) + t.Run("there are nodes in the asg which need awsmachines", func(t *testing.T) { + t.Run("should create awsmachines for the nodes", func(t *testing.T) { + g := NewWithT(t) + setup(t, g) + defer teardown(t, g) + + asg := &expinfrav1.AutoScalingGroup{ + Name: "name", + Instances: []infrav1.Instance{ + { + ID: "1", + }, + { + ID: "2", + }, + }, + Subnets: []string{}, + } + + reconSvc.EXPECT().ReconcileLaunchTemplate(gomock.Any(), ec2Svc, gomock.Any(), gomock.Any()).Return(nil) + asgSvc.EXPECT().GetASGByName(gomock.Any()).Return(asg, nil) + ec2Svc.EXPECT().InstanceIfExists(aws.String("1")).Return(&infrav1.Instance{ID: "1", Type: "m6.2xlarge"}, nil) + ec2Svc.EXPECT().InstanceIfExists(aws.String("2")).Return(&infrav1.Instance{ID: "2", Type: "m6.2xlarge"}, nil) + asgSvc.EXPECT().SubnetIDs(gomock.Any()).Return([]string{}, nil) + asgSvc.EXPECT().UpdateASG(gomock.Any()).Return(nil) + reconSvc.EXPECT().ReconcileTags(gomock.Any(), gomock.Any()).Return(nil) + + _, err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) + g.Expect(err).To(Succeed()) + + g.Eventually(func() int { + awsMachines := &infrav1.AWSMachineList{} + if err := testEnv.List(ctx, awsMachines, client.InNamespace(ms.AWSMachinePool.Namespace)); err != nil { + return -1 + } + return len(awsMachines.Items) + }).Should(BeEquivalentTo(len(asg.Instances))) + }) + t.Run("should delete awsmachines for nodes removed from the asg", func(t *testing.T) { + g := NewWithT(t) + setup(t, g) + defer teardown(t, g) + + asg := &expinfrav1.AutoScalingGroup{ + Name: "name", + Instances: []infrav1.Instance{ + { + ID: "1", + }, + }, + Subnets: []string{}, + } + g.Expect(testEnv.Create(context.Background(), &clusterv1.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ms.AWSMachinePool.Namespace, + Name: "name-1", + UID: "1", + }, + Spec: clusterv1.MachineSpec{ + ClusterName: "test", + }, + })).To(Succeed()) + g.Expect(testEnv.Create(context.Background(), &infrav1.AWSMachine{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ms.AWSMachinePool.Namespace, + Name: "name-1", + Labels: map[string]string{ + clusterv1.MachinePoolNameLabel: ms.MachinePool.Name, + clusterv1.ClusterNameLabel: ms.MachinePool.Spec.ClusterName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1beta1", + Kind: "Machine", + Name: "name-1", + UID: "1", + }, + }, + }, + Spec: infrav1.AWSMachineSpec{ + ProviderID: aws.String("1"), + InstanceType: "m6.2xlarge", + }, + })).To(Succeed()) + g.Expect(testEnv.Create(context.Background(), &clusterv1.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ms.AWSMachinePool.Namespace, + Name: "name-2", + UID: "2", + }, + Spec: clusterv1.MachineSpec{ + ClusterName: "test", + }, + })).To(Succeed()) + g.Expect(testEnv.Create(context.Background(), &infrav1.AWSMachine{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ms.AWSMachinePool.Namespace, + Name: "name-2", + Labels: map[string]string{ + clusterv1.MachinePoolNameLabel: ms.MachinePool.Name, + clusterv1.ClusterNameLabel: ms.MachinePool.Spec.ClusterName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1beta1", + Kind: "Machine", + Name: "name-2", + UID: "2", + }, + }, + }, + Spec: infrav1.AWSMachineSpec{ + ProviderID: aws.String("2"), + InstanceType: "m6.2xlarge", + }, + })).To(Succeed()) + + reconSvc.EXPECT().ReconcileLaunchTemplate(gomock.Any(), ec2Svc, gomock.Any(), gomock.Any()).Return(nil) + asgSvc.EXPECT().GetASGByName(gomock.Any()).Return(asg, nil) + asgSvc.EXPECT().SubnetIDs(gomock.Any()).Return([]string{}, nil) + asgSvc.EXPECT().UpdateASG(gomock.Any()).Return(nil) + reconSvc.EXPECT().ReconcileTags(gomock.Any(), gomock.Any()).Return(nil) + + _, err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) + g.Expect(err).To(Succeed()) + + g.Eventually(func() int { + awsMachines := &infrav1.AWSMachineList{} + if err := testEnv.List(ctx, awsMachines, client.InNamespace(ms.AWSMachinePool.Namespace)); err != nil { + return -1 + } + return len(awsMachines.Items) + }).Should(BeEquivalentTo(len(asg.Instances))) + }) + }) t.Run("there's suspended processes provided during ASG creation", func(t *testing.T) { setSuspendedProcesses := func(t *testing.T, g *WithT) { t.Helper() @@ -305,7 +448,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { }, nil) asgSvc.EXPECT().SuspendProcesses("name", []string{"Launch", "Terminate"}).Return(nil).AnyTimes().Times(0) - err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) + _, err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) g.Expect(err).To(Succeed()) }) }) @@ -341,7 +484,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { "ReplaceUnhealthy", })).Return(nil).AnyTimes().Times(1) - err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) + _, err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) g.Expect(err).To(Succeed()) }) }) @@ -373,7 +516,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { asgSvc.EXPECT().SuspendProcesses("name", []string{"Terminate"}).Return(nil).AnyTimes().Times(1) asgSvc.EXPECT().ResumeProcesses("name", []string{"process3"}).Return(nil).AnyTimes().Times(1) - err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) + _, err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) g.Expect(err).To(Succeed()) }) }) @@ -398,9 +541,10 @@ func TestAWSMachinePoolReconciler(t *testing.T) { } ms.MachinePool.Spec.Replicas = ptr.To[int32](0) - g.Expect(testEnv.Create(ctx, ms.MachinePool)).To(Succeed()) + g.Expect(testEnv.Create(ctx, ms.MachinePool.DeepCopy())).To(Succeed()) - _ = reconciler.reconcileNormal(context.Background(), ms, cs, cs) + _, err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) + g.Expect(err).To(Succeed()) g.Expect(*ms.MachinePool.Spec.Replicas).To(Equal(int32(1))) }) t.Run("No need to update Asg because asgNeedsUpdates is false and no subnets change", func(t *testing.T) { @@ -431,7 +575,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { asgSvc.EXPECT().SubnetIDs(gomock.Any()).Return([]string{"subnet2", "subnet1"}, nil).Times(1) asgSvc.EXPECT().UpdateASG(gomock.Any()).Return(nil).Times(0) - err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) + _, err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) g.Expect(err).To(Succeed()) }) t.Run("update Asg due to subnet changes", func(t *testing.T) { @@ -449,7 +593,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { asgSvc.EXPECT().SubnetIDs(gomock.Any()).Return([]string{"subnet1"}, nil).Times(1) asgSvc.EXPECT().UpdateASG(gomock.Any()).Return(nil).Times(1) - err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) + _, err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) g.Expect(err).To(Succeed()) }) t.Run("update Asg due to asgNeedsUpdates returns true", func(t *testing.T) { @@ -467,7 +611,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { asgSvc.EXPECT().SubnetIDs(gomock.Any()).Return([]string{}, nil).Times(1) asgSvc.EXPECT().UpdateASG(gomock.Any()).Return(nil).Times(1) - err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) + _, err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) g.Expect(err).To(Succeed()) }) @@ -492,7 +636,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { }, nil }) - err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) + _, err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) g.Expect(err).To(Succeed()) }) @@ -539,7 +683,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { // No changes, so there must not be an ASG update! asgSvc.EXPECT().UpdateASG(gomock.Any()).Times(0) - err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) + _, err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) g.Expect(err).To(Succeed()) }) @@ -592,7 +736,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { // No changes, so there must not be an ASG update! asgSvc.EXPECT().UpdateASG(gomock.Any()).Times(0) - err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) + _, err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) g.Expect(err).To(Succeed()) }) @@ -648,7 +792,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { // No changes, so there must not be an ASG update! asgSvc.EXPECT().UpdateASG(gomock.Any()).Times(0) - err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) + _, err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) g.Expect(err).To(Succeed()) }) @@ -670,7 +814,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { }, nil }) - err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) + _, err := reconciler.reconcileNormal(context.Background(), ms, cs, cs) g.Expect(err).To(Succeed()) g.Expect(ms.AWSMachinePool.Status.LaunchTemplateID).ToNot(BeEmpty()) @@ -732,7 +876,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { // No changes, so there must not be an ASG update! asgSvc.EXPECT().UpdateASG(gomock.Any()).Times(0) - err = reconciler.reconcileNormal(context.Background(), ms, cs, cs) + _, err = reconciler.reconcileNormal(context.Background(), ms, cs, cs) g.Expect(err).To(Succeed()) }) }) @@ -756,7 +900,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { expectedErr := errors.New("no connection available ") asgSvc.EXPECT().GetASGByName(gomock.Any()).Return(nil, expectedErr).AnyTimes() - err := reconciler.reconcileDelete(ms, cs, cs) + err := reconciler.reconcileDelete(context.Background(), ms, cs, cs) g.Expect(errors.Cause(err)).To(MatchError(expectedErr)) }) t.Run("should log and remove finalizer when no machinepool exists", func(t *testing.T) { @@ -771,7 +915,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) { buf := new(bytes.Buffer) klog.SetOutput(buf) - err := reconciler.reconcileDelete(ms, cs, cs) + err := reconciler.reconcileDelete(context.Background(), ms, cs, cs) g.Expect(err).To(BeNil()) g.Expect(buf.String()).To(ContainSubstring("Unable to locate ASG")) g.Expect(ms.AWSMachinePool.Finalizers).To(ConsistOf(metav1.FinalizerDeleteDependents)) @@ -792,7 +936,8 @@ func TestAWSMachinePoolReconciler(t *testing.T) { buf := new(bytes.Buffer) klog.SetOutput(buf) - err := reconciler.reconcileDelete(ms, cs, cs) + err := reconciler.reconcileDelete(context.Background(), ms, cs, cs) + g.Expect(err).To(BeNil()) g.Expect(ms.AWSMachinePool.Status.Ready).To(BeFalse()) g.Eventually(recorder.Events).Should(Receive(ContainSubstring("DeletionInProgress"))) diff --git a/exp/controllers/awsmanagedmachinepool_controller.go b/exp/controllers/awsmanagedmachinepool_controller.go index 8c0d75c2ec..6a0fb06960 100644 --- a/exp/controllers/awsmanagedmachinepool_controller.go +++ b/exp/controllers/awsmanagedmachinepool_controller.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "fmt" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" @@ -36,8 +37,10 @@ import ( ekscontrolplanev1 "sigs.k8s.io/cluster-api-provider-aws/v2/controlplane/eks/api/v1beta2" expinfrav1 "sigs.k8s.io/cluster-api-provider-aws/v2/exp/api/v1beta2" + "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud" "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/scope" "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/services" + asgsvc "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/services/autoscaling" "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/services/ec2" "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/services/eks" "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/logger" @@ -46,6 +49,7 @@ import ( "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/conditions" + "sigs.k8s.io/cluster-api/util/patch" "sigs.k8s.io/cluster-api/util/predicates" ) @@ -137,6 +141,16 @@ func (r *AWSManagedMachinePoolReconciler) Reconcile(ctx context.Context, req ctr return reconcile.Result{}, nil } + ampHelper, err := patch.NewHelper(awsPool, r.Client) + if err != nil { + return ctrl.Result{}, errors.Wrap(err, "failed to init AWSMachinePool patch helper") + } + awsPool.Status.InfrastructureMachineKind = "AWSMachine" + // Patch now so that the status and selectors are available. + if err := ampHelper.Patch(ctx, awsPool); err != nil { + return ctrl.Result{}, err + } + managedControlPlaneScope, err := scope.NewManagedControlPlaneScope(scope.ManagedControlPlaneScopeParams{ Client: r.Client, Logger: log, @@ -206,9 +220,16 @@ func (r *AWSManagedMachinePoolReconciler) reconcileNormal( } ekssvc := eks.NewNodegroupService(machinePoolScope) + asgsvc := r.getASGService(ec2Scope) ec2svc := r.getEC2Service(ec2Scope) reconSvc := r.getReconcileService(ec2Scope) + asgName := machinePoolScope.Name() + asg, err := asgsvc.ASGIfExists(&asgName) + if err != nil { + return fmt.Errorf("failed to query asg for %s: %w", asgName, err) + } + if machinePoolScope.ManagedMachinePool.Spec.AWSLaunchTemplate != nil { canUpdateLaunchTemplate := func() (bool, error) { return true, nil @@ -236,6 +257,23 @@ func (r *AWSManagedMachinePoolReconciler) reconcileNormal( conditions.MarkTrue(machinePoolScope.ManagedMachinePool, expinfrav1.LaunchTemplateReadyCondition) } + awsMachineList, err := getAWSMachines(ctx, machinePoolScope.MachinePool, r.Client) + if err != nil { + return err + } + + if err := createAWSMachinesIfNotExists(ctx, awsMachineList, machinePoolScope.MachinePool, asg, machinePoolScope.GetLogger(), r.Client, ec2svc); err != nil { + machinePoolScope.ManagedMachinePool.Status.Ready = false + conditions.MarkFalse(machinePoolScope.ManagedMachinePool, clusterv1.ReadyCondition, expinfrav1.AWSMachineCreationFailed, clusterv1.ConditionSeverityWarning, "%s", err.Error()) + return fmt.Errorf("failed to create missing awsmachines: %w", err) + } + + if err := deleteOrphanedAWSMachines(ctx, awsMachineList, asg, machinePoolScope.GetLogger(), r.Client); err != nil { + machinePoolScope.ManagedMachinePool.Status.Ready = false + conditions.MarkFalse(machinePoolScope.ManagedMachinePool, clusterv1.ReadyCondition, expinfrav1.AWSMachineDeletionFailed, clusterv1.ConditionSeverityWarning, "%s", err.Error()) + return fmt.Errorf("failed to clean up dangling awsmachines: %w", err) + } + if err := ekssvc.ReconcilePool(ctx); err != nil { return errors.Wrapf(err, "failed to reconcile machine pool for AWSManagedMachinePool %s/%s", machinePoolScope.ManagedMachinePool.Namespace, machinePoolScope.ManagedMachinePool.Name) } @@ -244,12 +282,16 @@ func (r *AWSManagedMachinePoolReconciler) reconcileNormal( } func (r *AWSManagedMachinePoolReconciler) reconcileDelete( - _ context.Context, + ctx context.Context, machinePoolScope *scope.ManagedMachinePoolScope, ec2Scope scope.EC2Scope, ) error { machinePoolScope.Info("Reconciling deletion of AWSManagedMachinePool") + if err := reconcileDeleteAWSMachines(ctx, machinePoolScope.MachinePool, r.Client, machinePoolScope.GetLogger()); err != nil { + return err + } + ekssvc := eks.NewNodegroupService(machinePoolScope) ec2Svc := ec2.NewService(ec2Scope) @@ -345,6 +387,10 @@ func managedControlPlaneToManagedMachinePoolMapFunc(c client.Client, gvk schema. } } +func (r *AWSManagedMachinePoolReconciler) getASGService(scope cloud.ClusterScoper) services.ASGInterface { + return asgsvc.NewService(scope) +} + func (r *AWSManagedMachinePoolReconciler) getEC2Service(scope scope.EC2Scope) services.EC2Interface { return ec2.NewService(scope) } diff --git a/pkg/cloud/awserrors/errors.go b/pkg/cloud/awserrors/errors.go index d51b41595c..765d3ce626 100644 --- a/pkg/cloud/awserrors/errors.go +++ b/pkg/cloud/awserrors/errors.go @@ -56,6 +56,7 @@ const ( VPCNotFound = "InvalidVpcID.NotFound" VPCMissingParameter = "MissingParameter" ErrCodeRepositoryAlreadyExistsException = "RepositoryAlreadyExistsException" + ASGNotFound = "AutoScalingGroup.NotFound" ) var _ error = &EC2Error{} @@ -172,6 +173,8 @@ func IsInvalidNotFoundError(err error) bool { return true case LaunchTemplateNameNotFound: return true + case ASGNotFound: + return true } } diff --git a/pkg/cloud/scope/machine.go b/pkg/cloud/scope/machine.go index 331c4c31e2..a61d72479a 100644 --- a/pkg/cloud/scope/machine.go +++ b/pkg/cloud/scope/machine.go @@ -29,6 +29,7 @@ import ( infrav1 "sigs.k8s.io/cluster-api-provider-aws/v2/api/v1beta2" ekscontrolplanev1 "sigs.k8s.io/cluster-api-provider-aws/v2/controlplane/eks/api/v1beta2" + "sigs.k8s.io/cluster-api-provider-aws/v2/exp/api/v1beta2" "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/logger" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" capierrors "sigs.k8s.io/cluster-api/errors" @@ -114,6 +115,19 @@ func (m *MachineScope) IsControlPlane() bool { return util.IsControlPlaneMachine(m.Machine) } +// IsMachinePoolMachine returns true if the machine is created for a machinepool. +func (m *MachineScope) IsMachinePoolMachine() bool { + if _, ok := m.Machine.GetLabels()[clusterv1.MachinePoolNameLabel]; ok { + return true + } + for _, owner := range m.Machine.OwnerReferences { + if owner.Kind == v1beta2.KindMachinePool { + return true + } + } + return false +} + // Role returns the machine role from the labels. func (m *MachineScope) Role() string { if util.IsControlPlaneMachine(m.Machine) { diff --git a/pkg/cloud/services/ec2/instances.go b/pkg/cloud/services/ec2/instances.go index 0742b0c589..559744c3bd 100644 --- a/pkg/cloud/services/ec2/instances.go +++ b/pkg/cloud/services/ec2/instances.go @@ -1072,7 +1072,7 @@ func (s *Service) GetDHCPOptionSetDomainName(ec2client ec2iface.EC2API, vpcID *s log := s.scope.GetLogger() if vpcID == nil { - log.Info("vpcID is nil, skipping DHCP Option Set discovery") + log.V(4).Info("vpcID is nil, skipping DHCP Option Set discovery") return nil }