Skip to content

Commit

Permalink
Merge pull request #4831 from muraee/machinepool-providerid-list
Browse files Browse the repository at this point in the history
✨ ROSA: Reconcile ROSAMachinePool.spec.ProviderIDList
  • Loading branch information
k8s-ci-robot authored Mar 4, 2024
2 parents f0a5ecf + 88c31a9 commit c9d6ab1
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 23 deletions.
95 changes: 77 additions & 18 deletions exp/controllers/rosamachinepool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/blang/semver"
"github.com/google/go-cmp/cmp"
cmv1 "github.com/openshift-online/ocm-sdk-go/clustersmgmt/v1"
Expand All @@ -15,6 +17,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand Down Expand Up @@ -130,6 +133,7 @@ func (r *ROSAMachinePoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ
MachinePool: machinePool,
RosaMachinePool: rosaMachinePool,
Logger: log,
Endpoints: r.Endpoints,
})
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to create scope")
Expand Down Expand Up @@ -198,6 +202,17 @@ func (r *ROSAMachinePoolReconciler) reconcileNormal(ctx context.Context,
}

rosaMachinePool := machinePoolScope.RosaMachinePool
machinePool := machinePoolScope.MachinePool

if rosaMachinePool.Spec.Autoscaling != nil && !annotations.ReplicasManagedByExternalAutoscaler(machinePool) {
// make sure cluster.x-k8s.io/replicas-managed-by annotation is set on CAPI MachinePool when autoscaling is enabled.
annotations.AddAnnotations(machinePool, map[string]string{
clusterv1.ReplicasManagedByAnnotation: "rosa",
})
if err := machinePoolScope.PatchCAPIMachinePoolObject(ctx); err != nil {
return ctrl.Result{}, err
}
}

nodePool, found, err := ocmClient.GetNodePool(machinePoolScope.ControlPlane.Status.ID, rosaMachinePool.Spec.NodePoolName)
if err != nil {
Expand All @@ -210,9 +225,25 @@ func (r *ROSAMachinePoolReconciler) reconcileNormal(ctx context.Context,
return ctrl.Result{}, fmt.Errorf("failed to ensure rosaMachinePool: %w", err)
}

// TODO (alberto): discover and store providerIDs from aws so the CAPI controller can match then to Nodes and report readiness.
rosaMachinePool.Status.Replicas = int32(nodePool.Status().CurrentReplicas())
if nodePool.Replicas() == nodePool.Status().CurrentReplicas() && nodePool.Status().Message() == "" {
currentReplicas := int32(nodePool.Status().CurrentReplicas())
if annotations.ReplicasManagedByExternalAutoscaler(machinePool) {
// Set MachinePool replicas to rosa autoscaling replicas
if *machinePool.Spec.Replicas != currentReplicas {
machinePoolScope.Info("Setting MachinePool replicas to rosa autoscaling replicas",
"local", *machinePool.Spec.Replicas,
"external", currentReplicas)
machinePool.Spec.Replicas = &currentReplicas
if err := machinePoolScope.PatchCAPIMachinePoolObject(ctx); err != nil {
return ctrl.Result{}, err
}
}
}
if err := r.reconcileProviderIDList(ctx, machinePoolScope, nodePool); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to reconcile ProviderIDList: %w", err)
}

rosaMachinePool.Status.Replicas = currentReplicas
if rosa.IsNodePoolReady(nodePool) {
conditions.MarkTrue(rosaMachinePool, expinfrav1.RosaMachinePoolReadyCondition)
rosaMachinePool.Status.Ready = true

Expand All @@ -234,7 +265,7 @@ func (r *ROSAMachinePoolReconciler) reconcileNormal(ctx context.Context,
return ctrl.Result{RequeueAfter: time.Second * 60}, nil
}

npBuilder := nodePoolBuilder(rosaMachinePool.Spec, machinePoolScope.MachinePool.Spec)
npBuilder := nodePoolBuilder(rosaMachinePool.Spec, machinePool.Spec)
nodePoolSpec, err := npBuilder.Build()
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to build rosa nodepool: %w", err)
Expand Down Expand Up @@ -294,20 +325,7 @@ func (r *ROSAMachinePoolReconciler) reconcileMachinePoolVersion(machinePoolScope
}

if scheduledUpgrade == nil {
policy, err := ocmClient.BuildNodeUpgradePolicy(version, nodePool.ID(), ocm.UpgradeScheduling{
AutomaticUpgrades: false,
// The OCM API places guardrails around the minimum and maximum delay that a user can request,
// for the next run of the upgrade, which is [5min,6mo]. Set our next run request to something
// slightly longer than 5min to make sure we account for the latency between when we send this
// request and when the server processes it.
// https://gitlab.cee.redhat.com/service/uhc-clusters-service/-/blob/master/cmd/clusters-service/servecmd/apiserver/upgrade_policy_handlers.go
NextRun: time.Now().Add(6 * time.Minute),
})
if err != nil {
return fmt.Errorf("failed to create nodePool upgrade schedule to version %s: %w", version, err)
}

scheduledUpgrade, err = ocmClient.ScheduleNodePoolUpgrade(clusterID, nodePool.ID(), policy)
scheduledUpgrade, err = rosa.ScheduleNodePoolUpgrade(ocmClient, clusterID, nodePool, version, time.Now())
if err != nil {
return fmt.Errorf("failed to schedule nodePool upgrade to version %s: %w", version, err)
}
Expand Down Expand Up @@ -453,6 +471,47 @@ func nodePoolToRosaMachinePoolSpec(nodePool *cmv1.NodePool) expinfrav1.RosaMachi
return spec
}

func (r *ROSAMachinePoolReconciler) reconcileProviderIDList(ctx context.Context, machinePoolScope *scope.RosaMachinePoolScope, nodePool *cmv1.NodePool) error {
tags := nodePool.AWSNodePool().Tags()
if len(tags) == 0 {
// can't identify EC2 instances belonging to this NodePool without tags.
return nil
}

ec2Svc := scope.NewEC2Client(machinePoolScope, machinePoolScope, &machinePoolScope.Logger, machinePoolScope.InfraCluster())
response, err := ec2Svc.DescribeInstancesWithContext(ctx, &ec2.DescribeInstancesInput{
Filters: buildEC2FiltersFromTags(tags),
})
if err != nil {
return err
}

var providerIDList []string
for _, reservation := range response.Reservations {
for _, instance := range reservation.Instances {
providerID := scope.GenerateProviderID(*instance.Placement.AvailabilityZone, *instance.InstanceId)
providerIDList = append(providerIDList, providerID)
}
}

machinePoolScope.RosaMachinePool.Spec.ProviderIDList = providerIDList
return nil
}

func buildEC2FiltersFromTags(tags map[string]string) []*ec2.Filter {
filters := make([]*ec2.Filter, len(tags))
for key, value := range tags {
filters = append(filters, &ec2.Filter{
Name: ptr.To(fmt.Sprintf("tag:%s", key)),
Values: aws.StringSlice([]string{
value,
}),
})
}

return filters
}

func rosaControlPlaneToRosaMachinePoolMapFunc(c client.Client, gvk schema.GroupVersionKind, log logger.Wrapper) handler.MapFunc {
return func(ctx context.Context, o client.Object) []reconcile.Request {
rosaControlPlane, ok := o.(*rosacontrolplanev1.ROSAControlPlane)
Expand Down
3 changes: 1 addition & 2 deletions pkg/cloud/scope/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package scope
import (
"context"
"encoding/base64"
"fmt"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -142,7 +141,7 @@ func (m *MachineScope) GetProviderID() string {

// SetProviderID sets the AWSMachine providerID in spec.
func (m *MachineScope) SetProviderID(instanceID, availabilityZone string) {
providerID := fmt.Sprintf("aws:///%s/%s", availabilityZone, instanceID)
providerID := GenerateProviderID(availabilityZone, instanceID)
m.AWSMachine.Spec.ProviderID = ptr.To[string](providerID)
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/cloud/scope/providerid.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package scope

import (
"fmt"
"regexp"
"strings"

Expand Down Expand Up @@ -124,3 +125,14 @@ func (p *ProviderID) Validate() bool {
func (p *ProviderID) IndexKey() string {
return p.String()
}

// ProviderIDPrefix is the prefix of AWS resource IDs to form the Kubernetes Provider ID.
// NOTE: this format matches the 2 slashes format used in cloud-provider and cluster-autoscaler.
const ProviderIDPrefix = "aws://"

// GenerateProviderID generates a valid AWS Node/Machine ProviderID field.
//
// By default, the last id provided is used as identifier (last part).
func GenerateProviderID(ids ...string) string {
return fmt.Sprintf("%s/%s", ProviderIDPrefix, strings.Join(ids, "/"))
}
55 changes: 55 additions & 0 deletions pkg/cloud/scope/providerid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package scope

import (
"testing"

. "github.com/onsi/gomega"
)

func TestGenerateProviderID(t *testing.T) {
testCases := []struct {
ids []string

expectedProviderID string
}{
{
ids: []string{
"eu-west-1a",
"instance-id",
},
expectedProviderID: "aws:///eu-west-1a/instance-id",
},
{
ids: []string{
"eu-west-1a",
"test-id1",
"test-id2",
"instance-id",
},
expectedProviderID: "aws:///eu-west-1a/test-id1/test-id2/instance-id",
},
}

for _, tc := range testCases {
g := NewGomegaWithT(t)
providerID := GenerateProviderID(tc.ids...)

g.Expect(providerID).To(Equal(tc.expectedProviderID))
}
}
53 changes: 51 additions & 2 deletions pkg/cloud/scope/rosamachinepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package scope
import (
"context"

awsclient "github.com/aws/aws-sdk-go/aws/client"
"github.com/pkg/errors"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/cluster-api-provider-aws/v2/api/v1beta2"
rosacontrolplanev1 "sigs.k8s.io/cluster-api-provider-aws/v2/controlplane/rosa/api/v1beta2"
expinfrav1 "sigs.k8s.io/cluster-api-provider-aws/v2/exp/api/v1beta2"
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud"
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/throttle"
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/logger"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
expclusterv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
Expand All @@ -42,6 +45,8 @@ type RosaMachinePoolScopeParams struct {
RosaMachinePool *expinfrav1.ROSAMachinePool
MachinePool *expclusterv1.MachinePool
ControllerName string

Endpoints []ServiceEndpoint
}

// NewRosaMachinePoolScope creates a new Scope from the supplied parameters.
Expand Down Expand Up @@ -70,7 +75,7 @@ func NewRosaMachinePoolScope(params RosaMachinePoolScopeParams) (*RosaMachinePoo
return nil, errors.Wrap(err, "failed to init MachinePool patch helper")
}

return &RosaMachinePoolScope{
scope := &RosaMachinePoolScope{
Logger: *params.Logger,
Client: params.Client,
patchHelper: ammpHelper,
Expand All @@ -81,9 +86,22 @@ func NewRosaMachinePoolScope(params RosaMachinePoolScopeParams) (*RosaMachinePoo
RosaMachinePool: params.RosaMachinePool,
MachinePool: params.MachinePool,
controllerName: params.ControllerName,
}, nil
}

session, serviceLimiters, err := sessionForClusterWithRegion(params.Client, scope, params.ControlPlane.Spec.Region, params.Endpoints, params.Logger)
if err != nil {
return nil, errors.Errorf("failed to create aws session: %v", err)
}

scope.session = session
scope.serviceLimiters = serviceLimiters

return scope, nil
}

var _ cloud.Session = &RosaMachinePoolScope{}
var _ cloud.SessionMetadata = &RosaMachinePoolScope{}

// RosaMachinePoolScope defines the basic context for an actuator to operate upon.
type RosaMachinePoolScope struct {
logger.Logger
Expand All @@ -96,6 +114,9 @@ type RosaMachinePoolScope struct {
RosaMachinePool *expinfrav1.ROSAMachinePool
MachinePool *expclusterv1.MachinePool

session awsclient.ConfigProvider
serviceLimiters throttle.ServiceLimiters

controllerName string
}

Expand Down Expand Up @@ -139,6 +160,34 @@ func (s *RosaMachinePoolScope) GetSetter() conditions.Setter {
return s.RosaMachinePool
}

// ServiceLimiter implements cloud.Session.
func (s *RosaMachinePoolScope) ServiceLimiter(service string) *throttle.ServiceLimiter {
if sl, ok := s.serviceLimiters[service]; ok {
return sl
}
return nil
}

// Session implements cloud.Session.
func (s *RosaMachinePoolScope) Session() awsclient.ConfigProvider {
return s.session
}

// IdentityRef implements cloud.SessionMetadata.
func (s *RosaMachinePoolScope) IdentityRef() *v1beta2.AWSIdentityReference {
return s.ControlPlane.Spec.IdentityRef
}

// InfraClusterName implements cloud.SessionMetadata.
func (s *RosaMachinePoolScope) InfraClusterName() string {
return s.ControlPlane.Name
}

// Namespace implements cloud.SessionMetadata.
func (s *RosaMachinePoolScope) Namespace() string {
return s.Cluster.Namespace
}

// RosaMchinePoolReadyFalse marks the ready condition false using warning if error isn't
// empty.
func (s *RosaMachinePoolScope) RosaMchinePoolReadyFalse(reason string, err string) error {
Expand Down
23 changes: 23 additions & 0 deletions pkg/rosa/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package rosa

import (
cmv1 "github.com/openshift-online/ocm-sdk-go/clustersmgmt/v1"
)

// IsNodePoolReady checkes whether the nodepool is provisoned and all replicas are available.
// If autosacling is enabled, NodePool must have replicas >= autosacling.MinReplica to be considered ready.
func IsNodePoolReady(nodePool *cmv1.NodePool) bool {
if nodePool.Status().Message() != "" {
return false
}

if nodePool.Replicas() != 0 {
return nodePool.Replicas() == nodePool.Status().CurrentReplicas()
}

if nodePool.Autoscaling() != nil {
return nodePool.Status().CurrentReplicas() >= nodePool.Autoscaling().MinReplica()
}

return false
}
Loading

0 comments on commit c9d6ab1

Please sign in to comment.