diff --git a/exp/controllers/rosamachinepool_controller.go b/exp/controllers/rosamachinepool_controller.go index b05a0fccf5..2dae9b6f9e 100644 --- a/exp/controllers/rosamachinepool_controller.go +++ b/exp/controllers/rosamachinepool_controller.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" "github.com/blang/semver" "github.com/google/go-cmp/cmp" cmv1 "github.com/openshift-online/ocm-sdk-go/clustersmgmt/v1" @@ -15,6 +17,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" @@ -130,6 +133,7 @@ func (r *ROSAMachinePoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ MachinePool: machinePool, RosaMachinePool: rosaMachinePool, Logger: log, + Endpoints: r.Endpoints, }) if err != nil { return ctrl.Result{}, errors.Wrap(err, "failed to create scope") @@ -198,6 +202,17 @@ func (r *ROSAMachinePoolReconciler) reconcileNormal(ctx context.Context, } rosaMachinePool := machinePoolScope.RosaMachinePool + machinePool := machinePoolScope.MachinePool + + if rosaMachinePool.Spec.Autoscaling != nil && !annotations.ReplicasManagedByExternalAutoscaler(machinePool) { + // make sure cluster.x-k8s.io/replicas-managed-by annotation is set on CAPI MachinePool when autoscaling is enabled. + annotations.AddAnnotations(machinePool, map[string]string{ + clusterv1.ReplicasManagedByAnnotation: "rosa", + }) + if err := machinePoolScope.PatchCAPIMachinePoolObject(ctx); err != nil { + return ctrl.Result{}, err + } + } nodePool, found, err := ocmClient.GetNodePool(machinePoolScope.ControlPlane.Status.ID, rosaMachinePool.Spec.NodePoolName) if err != nil { @@ -210,9 +225,25 @@ func (r *ROSAMachinePoolReconciler) reconcileNormal(ctx context.Context, return ctrl.Result{}, fmt.Errorf("failed to ensure rosaMachinePool: %w", err) } - // TODO (alberto): discover and store providerIDs from aws so the CAPI controller can match then to Nodes and report readiness. - rosaMachinePool.Status.Replicas = int32(nodePool.Status().CurrentReplicas()) - if nodePool.Replicas() == nodePool.Status().CurrentReplicas() && nodePool.Status().Message() == "" { + currentReplicas := int32(nodePool.Status().CurrentReplicas()) + if annotations.ReplicasManagedByExternalAutoscaler(machinePool) { + // Set MachinePool replicas to rosa autoscaling replicas + if *machinePool.Spec.Replicas != currentReplicas { + machinePoolScope.Info("Setting MachinePool replicas to rosa autoscaling replicas", + "local", *machinePool.Spec.Replicas, + "external", currentReplicas) + machinePool.Spec.Replicas = ¤tReplicas + if err := machinePoolScope.PatchCAPIMachinePoolObject(ctx); err != nil { + return ctrl.Result{}, err + } + } + } + if err := r.reconcileProviderIDList(ctx, machinePoolScope, nodePool); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to reconcile ProviderIDList: %w", err) + } + + rosaMachinePool.Status.Replicas = currentReplicas + if rosa.IsNodePoolReady(nodePool) { conditions.MarkTrue(rosaMachinePool, expinfrav1.RosaMachinePoolReadyCondition) rosaMachinePool.Status.Ready = true @@ -234,7 +265,7 @@ func (r *ROSAMachinePoolReconciler) reconcileNormal(ctx context.Context, return ctrl.Result{RequeueAfter: time.Second * 60}, nil } - npBuilder := nodePoolBuilder(rosaMachinePool.Spec, machinePoolScope.MachinePool.Spec) + npBuilder := nodePoolBuilder(rosaMachinePool.Spec, machinePool.Spec) nodePoolSpec, err := npBuilder.Build() if err != nil { return ctrl.Result{}, fmt.Errorf("failed to build rosa nodepool: %w", err) @@ -294,20 +325,7 @@ func (r *ROSAMachinePoolReconciler) reconcileMachinePoolVersion(machinePoolScope } if scheduledUpgrade == nil { - policy, err := ocmClient.BuildNodeUpgradePolicy(version, nodePool.ID(), ocm.UpgradeScheduling{ - AutomaticUpgrades: false, - // The OCM API places guardrails around the minimum and maximum delay that a user can request, - // for the next run of the upgrade, which is [5min,6mo]. Set our next run request to something - // slightly longer than 5min to make sure we account for the latency between when we send this - // request and when the server processes it. - // https://gitlab.cee.redhat.com/service/uhc-clusters-service/-/blob/master/cmd/clusters-service/servecmd/apiserver/upgrade_policy_handlers.go - NextRun: time.Now().Add(6 * time.Minute), - }) - if err != nil { - return fmt.Errorf("failed to create nodePool upgrade schedule to version %s: %w", version, err) - } - - scheduledUpgrade, err = ocmClient.ScheduleNodePoolUpgrade(clusterID, nodePool.ID(), policy) + scheduledUpgrade, err = rosa.ScheduleNodePoolUpgrade(ocmClient, clusterID, nodePool, version, time.Now()) if err != nil { return fmt.Errorf("failed to schedule nodePool upgrade to version %s: %w", version, err) } @@ -453,6 +471,47 @@ func nodePoolToRosaMachinePoolSpec(nodePool *cmv1.NodePool) expinfrav1.RosaMachi return spec } +func (r *ROSAMachinePoolReconciler) reconcileProviderIDList(ctx context.Context, machinePoolScope *scope.RosaMachinePoolScope, nodePool *cmv1.NodePool) error { + tags := nodePool.AWSNodePool().Tags() + if len(tags) == 0 { + // can't identify EC2 instances belonging to this NodePool without tags. + return nil + } + + ec2Svc := scope.NewEC2Client(machinePoolScope, machinePoolScope, &machinePoolScope.Logger, machinePoolScope.InfraCluster()) + response, err := ec2Svc.DescribeInstancesWithContext(ctx, &ec2.DescribeInstancesInput{ + Filters: buildEC2FiltersFromTags(tags), + }) + if err != nil { + return err + } + + var providerIDList []string + for _, reservation := range response.Reservations { + for _, instance := range reservation.Instances { + providerID := scope.GenerateProviderID(*instance.Placement.AvailabilityZone, *instance.InstanceId) + providerIDList = append(providerIDList, providerID) + } + } + + machinePoolScope.RosaMachinePool.Spec.ProviderIDList = providerIDList + return nil +} + +func buildEC2FiltersFromTags(tags map[string]string) []*ec2.Filter { + filters := make([]*ec2.Filter, len(tags)) + for key, value := range tags { + filters = append(filters, &ec2.Filter{ + Name: ptr.To(fmt.Sprintf("tag:%s", key)), + Values: aws.StringSlice([]string{ + value, + }), + }) + } + + return filters +} + func rosaControlPlaneToRosaMachinePoolMapFunc(c client.Client, gvk schema.GroupVersionKind, log logger.Wrapper) handler.MapFunc { return func(ctx context.Context, o client.Object) []reconcile.Request { rosaControlPlane, ok := o.(*rosacontrolplanev1.ROSAControlPlane) diff --git a/pkg/cloud/scope/machine.go b/pkg/cloud/scope/machine.go index f69fa572a5..ddbabe6376 100644 --- a/pkg/cloud/scope/machine.go +++ b/pkg/cloud/scope/machine.go @@ -19,7 +19,6 @@ package scope import ( "context" "encoding/base64" - "fmt" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" @@ -142,7 +141,7 @@ func (m *MachineScope) GetProviderID() string { // SetProviderID sets the AWSMachine providerID in spec. func (m *MachineScope) SetProviderID(instanceID, availabilityZone string) { - providerID := fmt.Sprintf("aws:///%s/%s", availabilityZone, instanceID) + providerID := GenerateProviderID(availabilityZone, instanceID) m.AWSMachine.Spec.ProviderID = ptr.To[string](providerID) } diff --git a/pkg/cloud/scope/providerid.go b/pkg/cloud/scope/providerid.go index ecf3feea19..1b11135ce4 100644 --- a/pkg/cloud/scope/providerid.go +++ b/pkg/cloud/scope/providerid.go @@ -17,6 +17,7 @@ limitations under the License. package scope import ( + "fmt" "regexp" "strings" @@ -124,3 +125,14 @@ func (p *ProviderID) Validate() bool { func (p *ProviderID) IndexKey() string { return p.String() } + +// ProviderIDPrefix is the prefix of AWS resource IDs to form the Kubernetes Provider ID. +// NOTE: this format matches the 2 slashes format used in cloud-provider and cluster-autoscaler. +const ProviderIDPrefix = "aws://" + +// GenerateProviderID generates a valid AWS Node/Machine ProviderID field. +// +// By default, the last id provided is used as identifier (last part). +func GenerateProviderID(ids ...string) string { + return fmt.Sprintf("%s/%s", ProviderIDPrefix, strings.Join(ids, "/")) +} diff --git a/pkg/cloud/scope/providerid_test.go b/pkg/cloud/scope/providerid_test.go new file mode 100644 index 0000000000..df6011f8d2 --- /dev/null +++ b/pkg/cloud/scope/providerid_test.go @@ -0,0 +1,55 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scope + +import ( + "testing" + + . "github.com/onsi/gomega" +) + +func TestGenerateProviderID(t *testing.T) { + testCases := []struct { + ids []string + + expectedProviderID string + }{ + { + ids: []string{ + "eu-west-1a", + "instance-id", + }, + expectedProviderID: "aws:///eu-west-1a/instance-id", + }, + { + ids: []string{ + "eu-west-1a", + "test-id1", + "test-id2", + "instance-id", + }, + expectedProviderID: "aws:///eu-west-1a/test-id1/test-id2/instance-id", + }, + } + + for _, tc := range testCases { + g := NewGomegaWithT(t) + providerID := GenerateProviderID(tc.ids...) + + g.Expect(providerID).To(Equal(tc.expectedProviderID)) + } +} diff --git a/pkg/cloud/scope/rosamachinepool.go b/pkg/cloud/scope/rosamachinepool.go index c39372b6b0..b420838c54 100644 --- a/pkg/cloud/scope/rosamachinepool.go +++ b/pkg/cloud/scope/rosamachinepool.go @@ -19,13 +19,16 @@ package scope import ( "context" + awsclient "github.com/aws/aws-sdk-go/aws/client" "github.com/pkg/errors" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/cluster-api-provider-aws/v2/api/v1beta2" rosacontrolplanev1 "sigs.k8s.io/cluster-api-provider-aws/v2/controlplane/rosa/api/v1beta2" expinfrav1 "sigs.k8s.io/cluster-api-provider-aws/v2/exp/api/v1beta2" "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud" + "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/throttle" "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/logger" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" expclusterv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1" @@ -42,6 +45,8 @@ type RosaMachinePoolScopeParams struct { RosaMachinePool *expinfrav1.ROSAMachinePool MachinePool *expclusterv1.MachinePool ControllerName string + + Endpoints []ServiceEndpoint } // NewRosaMachinePoolScope creates a new Scope from the supplied parameters. @@ -70,7 +75,7 @@ func NewRosaMachinePoolScope(params RosaMachinePoolScopeParams) (*RosaMachinePoo return nil, errors.Wrap(err, "failed to init MachinePool patch helper") } - return &RosaMachinePoolScope{ + scope := &RosaMachinePoolScope{ Logger: *params.Logger, Client: params.Client, patchHelper: ammpHelper, @@ -81,9 +86,22 @@ func NewRosaMachinePoolScope(params RosaMachinePoolScopeParams) (*RosaMachinePoo RosaMachinePool: params.RosaMachinePool, MachinePool: params.MachinePool, controllerName: params.ControllerName, - }, nil + } + + session, serviceLimiters, err := sessionForClusterWithRegion(params.Client, scope, params.ControlPlane.Spec.Region, params.Endpoints, params.Logger) + if err != nil { + return nil, errors.Errorf("failed to create aws session: %v", err) + } + + scope.session = session + scope.serviceLimiters = serviceLimiters + + return scope, nil } +var _ cloud.Session = &RosaMachinePoolScope{} +var _ cloud.SessionMetadata = &RosaMachinePoolScope{} + // RosaMachinePoolScope defines the basic context for an actuator to operate upon. type RosaMachinePoolScope struct { logger.Logger @@ -96,6 +114,9 @@ type RosaMachinePoolScope struct { RosaMachinePool *expinfrav1.ROSAMachinePool MachinePool *expclusterv1.MachinePool + session awsclient.ConfigProvider + serviceLimiters throttle.ServiceLimiters + controllerName string } @@ -139,6 +160,34 @@ func (s *RosaMachinePoolScope) GetSetter() conditions.Setter { return s.RosaMachinePool } +// ServiceLimiter implements cloud.Session. +func (s *RosaMachinePoolScope) ServiceLimiter(service string) *throttle.ServiceLimiter { + if sl, ok := s.serviceLimiters[service]; ok { + return sl + } + return nil +} + +// Session implements cloud.Session. +func (s *RosaMachinePoolScope) Session() awsclient.ConfigProvider { + return s.session +} + +// IdentityRef implements cloud.SessionMetadata. +func (s *RosaMachinePoolScope) IdentityRef() *v1beta2.AWSIdentityReference { + return s.ControlPlane.Spec.IdentityRef +} + +// InfraClusterName implements cloud.SessionMetadata. +func (s *RosaMachinePoolScope) InfraClusterName() string { + return s.ControlPlane.Name +} + +// Namespace implements cloud.SessionMetadata. +func (s *RosaMachinePoolScope) Namespace() string { + return s.Cluster.Namespace +} + // RosaMchinePoolReadyFalse marks the ready condition false using warning if error isn't // empty. func (s *RosaMachinePoolScope) RosaMchinePoolReadyFalse(reason string, err string) error { diff --git a/pkg/rosa/helpers.go b/pkg/rosa/helpers.go new file mode 100644 index 0000000000..39f6058069 --- /dev/null +++ b/pkg/rosa/helpers.go @@ -0,0 +1,23 @@ +package rosa + +import ( + cmv1 "github.com/openshift-online/ocm-sdk-go/clustersmgmt/v1" +) + +// IsNodePoolReady checkes whether the nodepool is provisoned and all replicas are available. +// If autosacling is enabled, NodePool must have replicas >= autosacling.MinReplica to be considered ready. +func IsNodePoolReady(nodePool *cmv1.NodePool) bool { + if nodePool.Status().Message() != "" { + return false + } + + if nodePool.Replicas() != 0 { + return nodePool.Replicas() == nodePool.Status().CurrentReplicas() + } + + if nodePool.Autoscaling() != nil { + return nodePool.Status().CurrentReplicas() >= nodePool.Autoscaling().MinReplica() + } + + return false +} diff --git a/pkg/rosa/versions.go b/pkg/rosa/versions.go index 2949d3b5cf..44d6fcecdb 100644 --- a/pkg/rosa/versions.go +++ b/pkg/rosa/versions.go @@ -1,6 +1,7 @@ package rosa import ( + "fmt" "time" "github.com/blang/semver" @@ -27,7 +28,8 @@ func CheckExistingScheduledUpgrade(client *ocm.Client, cluster *cmv1.Cluster) (* // ScheduleControlPlaneUpgrade schedules a new control plane upgrade to the specified version at the specified time. func ScheduleControlPlaneUpgrade(client *ocm.Client, cluster *cmv1.Cluster, version string, nextRun time.Time) (*cmv1.ControlPlaneUpgradePolicy, error) { // earliestNextRun is set to at least 5 min from now by the OCM API. - // we set it to 6 min here to account for latencty. + // Set our next run request to something slightly longer than 5min to make sure we account for the latency between when we send this + // request and when the server processes it. earliestNextRun := time.Now().Add(time.Minute * 6) if nextRun.Before(earliestNextRun) { nextRun = earliestNextRun @@ -45,6 +47,35 @@ func ScheduleControlPlaneUpgrade(client *ocm.Client, cluster *cmv1.Cluster, vers return client.ScheduleHypershiftControlPlaneUpgrade(cluster.ID(), upgradePolicy) } +// ScheduleNodePoolUpgrade schedules a new nodePool upgrade to the specified version at the specified time. +func ScheduleNodePoolUpgrade(client *ocm.Client, clusterID string, nodePool *cmv1.NodePool, version string, nextRun time.Time) (*cmv1.NodePoolUpgradePolicy, error) { + // earliestNextRun is set to at least 5 min from now by the OCM API. + // Set our next run request to something slightly longer than 5min to make sure we account for the latency between when we send this + // request and when the server processes it. + earliestNextRun := time.Now().Add(time.Minute * 6) + if nextRun.Before(earliestNextRun) { + nextRun = earliestNextRun + } + + upgradePolicy, err := cmv1.NewNodePoolUpgradePolicy(). + UpgradeType(cmv1.UpgradeTypeNodePool). + NodePoolID(nodePool.ID()). + ScheduleType(cmv1.ScheduleTypeManual). + Version(version). + NextRun(nextRun). + Build() + if err != nil { + return nil, err + } + + scheduledUpgrade, err := client.ScheduleNodePoolUpgrade(clusterID, nodePool.ID(), upgradePolicy) + if err != nil { + return nil, fmt.Errorf("failed to schedule nodePool upgrade to version %s: %w", version, err) + } + + return scheduledUpgrade, nil +} + // machinepools can be created with a minimal of two minor versions from the control plane. const minorVersionsAllowedDeviation = 2