From df699dd649effe244f05ca466c69c5cedb0bd728 Mon Sep 17 00:00:00 2001 From: Sara Elzayat Date: Tue, 21 Nov 2023 06:45:38 +0200 Subject: [PATCH] EKS reflector implementation This adds support for validating that the configuration for EKS and AKS clusters is provided when the type is selected. --- .../automatedclusterdiscovery_types.go | 23 +- api/v1alpha1/zz_generated.deepcopy.go | 20 + cmd/cluster-reflector-cli/main.go | 78 +++- ...ave.works_automatedclusterdiscoveries.yaml | 19 +- go.mod | 2 + go.sum | 10 + .../automatedclusterdiscovery_controller.go | 85 +++-- ...tomatedclusterdiscovery_controller_test.go | 342 ++++++++---------- main.go | 18 +- pkg/providers/aws/aws.go | 193 ++++++++++ pkg/providers/aws/aws_test.go | 201 ++++++++++ pkg/providers/capi/capi.go | 4 +- 12 files changed, 727 insertions(+), 268 deletions(-) create mode 100644 pkg/providers/aws/aws.go create mode 100644 pkg/providers/aws/aws_test.go diff --git a/api/v1alpha1/automatedclusterdiscovery_types.go b/api/v1alpha1/automatedclusterdiscovery_types.go index 8b9c2ce..4012210 100644 --- a/api/v1alpha1/automatedclusterdiscovery_types.go +++ b/api/v1alpha1/automatedclusterdiscovery_types.go @@ -21,20 +21,24 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// AKS defines the desired state of AKS +// AKS configures how AKS clusters are reflected. type AKS struct { // SubscriptionID is the Azure subscription ID // +required SubscriptionID string `json:"subscriptionID"` } +// EKS configures how AKS clusters are reflected. +type EKS struct { + // Region is the AWS region + // +required + Region string `json:"region"` +} + // AutomatedClusterDiscoverySpec defines the desired state of AutomatedClusterDiscovery type AutomatedClusterDiscoverySpec struct { - // Name is the name of the cluster - Name string `json:"name,omitempty"` - - // Type is the provider type. - // +kubebuilder:validation:Enum=aks;capi + // Type is the provider type + // +kubebuilder:validation:Enum=aks;eks;capi Type string `json:"type"` // If DisableTags is true, labels will not be applied to the generated @@ -42,9 +46,14 @@ type AutomatedClusterDiscoverySpec struct { // +optional DisableTags bool `json:"disableTags"` - // AKS configures discovery of AKS clusters from Azure. + // AKS configures discovery of AKS clusters from Azure. Must be provided if + // the type is aks. AKS *AKS `json:"aks,omitempty"` + // EKS configures discovery of EKS clusters from AWS. Must be provided if + // the type is eks. + EKS *EKS `json:"eks,omitempty"` + // The interval at which to run the discovery // +required Interval metav1.Duration `json:"interval"` diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 15f7120..cefe5fd 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -107,6 +107,11 @@ func (in *AutomatedClusterDiscoverySpec) DeepCopyInto(out *AutomatedClusterDisco *out = new(AKS) **out = **in } + if in.EKS != nil { + in, out := &in.EKS, &out.EKS + *out = new(EKS) + **out = **in + } out.Interval = in.Interval if in.CommonLabels != nil { in, out := &in.CommonLabels, &out.CommonLabels @@ -162,6 +167,21 @@ func (in *AutomatedClusterDiscoveryStatus) DeepCopy() *AutomatedClusterDiscovery return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EKS) DeepCopyInto(out *EKS) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EKS. +func (in *EKS) DeepCopy() *EKS { + if in == nil { + return nil + } + out := new(EKS) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ResourceInventory) DeepCopyInto(out *ResourceInventory) { *out = *in diff --git a/cmd/cluster-reflector-cli/main.go b/cmd/cluster-reflector-cli/main.go index 888a5fc..1234afd 100644 --- a/cmd/cluster-reflector-cli/main.go +++ b/cmd/cluster-reflector-cli/main.go @@ -4,6 +4,8 @@ import ( "fmt" gitopsv1alpha1 "github.com/weaveworks/cluster-controller/api/v1alpha1" + "github.com/weaveworks/cluster-reflector-controller/pkg/providers" + "github.com/weaveworks/cluster-reflector-controller/pkg/providers/aws" "github.com/weaveworks/cluster-reflector-controller/pkg/providers/azure" "github.com/weaveworks/cluster-reflector-controller/pkg/sync" corev1 "k8s.io/api/core/v1" @@ -21,25 +23,67 @@ type GitopsClusterOutput struct { Secret *corev1.Secret } -func main() { - var azureSubscriptionID string - var namespace string - var export bool +type Params struct { + Provider string + AWSRegion string + AzureSubscriptionID string + Namespace string + Export bool +} + +var params Params +const authHelperMessage = ` +If you're using a credential_process in your ~/.aws/config, you'll need to set the AWS_SDK_LOAD_CONFIG environment variable: + +AWS_SDK_LOAD_CONFIG=1 cluster-reflector-cli reflect ... +` + +func main() { var reflectCmd = &cobra.Command{ Use: "reflect", - Short: "Reflect AKS clusters", + Short: "Reflect AKS/EKS clusters", RunE: func(cmd *cobra.Command, args []string) error { - azureProvider := azure.NewAzureProvider(azureSubscriptionID) + if params.Provider == "" { + return fmt.Errorf("provider must be set") + } + + if params.Provider != "aws" && params.Provider != "azure" { + return fmt.Errorf("provider must be aws or azure") + } - clusters, err := azureProvider.ListClusters(cmd.Context()) - if err != nil { - return fmt.Errorf("failed to list clusters: %w", err) + if params.Provider == "azure" && params.AzureSubscriptionID == "" { + return fmt.Errorf("azure-subscription-id must be set") + } + + if params.Namespace == "default" { + fmt.Fprint(cmd.ErrOrStderr(), "WARNING: You are using the default namespace. This is not recommended.\n") + } + + clusters := []*providers.ProviderCluster{} + var err error + + if params.Provider == "aws" { + awsProvider := aws.NewAWSProvider(params.AWSRegion) + + clusters, err = awsProvider.ListClusters(cmd.Context()) + if err != nil { + return fmt.Errorf("failed to list clusters: %w\n%s", err, authHelperMessage) + } + } + + if params.Provider == "azure" { + azureProvider := azure.NewAzureProvider(params.AzureSubscriptionID) + + clusters, err = azureProvider.ListClusters(cmd.Context()) + if err != nil { + return fmt.Errorf("failed to list clusters: %w", err) + } } var k8sClient client.Client - if !export { + if !params.Export { k8sClient, err = CreateClient() if err != nil { return fmt.Errorf("failed to create client: %w", err) @@ -53,14 +97,14 @@ func main() { exports := []runtime.Object{} for _, cluster := range clusters { - gc, gcs, err := sync.SyncCluster(cmd.Context(), k8sClient, namespace, cluster) + gc, gcs, err := sync.SyncCluster(cmd.Context(), k8sClient, params.Namespace, cluster) if err != nil { return fmt.Errorf("failed to sync cluster: %w", err) } exports = append(exports, gc, gcs) } - if export { + if params.Export { for _, obj := range exports { clusterBytes, err := yaml.Marshal(obj) if err != nil { @@ -80,9 +124,13 @@ func main() { }, } - reflectCmd.Flags().StringVar(&azureSubscriptionID, "azure-subscription-id", "", "Azure Subscription ID") - reflectCmd.Flags().StringVar(&namespace, "namespace", "default", "Namespace to create the GitopsCluster in") - reflectCmd.Flags().BoolVar(&export, "export", false, "Export resources to stdout") + reflectCmd.Flags().StringVar(¶ms.Provider, "provider", "", "Provider to use (aws or azure)") + reflectCmd.Flags().StringVar(¶ms.AWSRegion, "region", "us-west-2", "AWS Region") + reflectCmd.Flags().StringVar(¶ms.AzureSubscriptionID, "azure-subscription-id", "", "Azure Subscription ID") + reflectCmd.Flags().StringVar(¶ms.Namespace, "namespace", "default", "Namespace to create the GitopsCluster in") + reflectCmd.Flags().BoolVar(¶ms.Export, "export", false, "Export resources to stdout") + + reflectCmd.MarkFlagRequired("provider") var rootCmd = &cobra.Command{Use: "cluster-reflector-cli"} rootCmd.AddCommand(reflectCmd) diff --git a/config/crd/bases/clusters.weave.works_automatedclusterdiscoveries.yaml b/config/crd/bases/clusters.weave.works_automatedclusterdiscoveries.yaml index 176fcd4..deb9ef4 100644 --- a/config/crd/bases/clusters.weave.works_automatedclusterdiscoveries.yaml +++ b/config/crd/bases/clusters.weave.works_automatedclusterdiscoveries.yaml @@ -44,7 +44,8 @@ spec: AutomatedClusterDiscovery properties: aks: - description: AKS defines the desired state of AKS + description: AKS configures discovery of AKS clusters from Azure. + Must be provided if the type is aks. properties: subscriptionID: description: SubscriptionID is the Azure subscription ID @@ -66,20 +67,28 @@ spec: description: If DisableTags is true, labels will not be applied to the generated Clusters from the tags on the upstream Clusters. type: boolean + eks: + description: EKS configures discovery of EKS clusters from AWS. Must + be provided if the type is eks. + properties: + region: + description: Region is the AWS region + type: string + required: + - region + type: object interval: description: The interval at which to run the discovery type: string - name: - description: Name is the name of the cluster - type: string suspend: description: Suspend tells the controller to suspend the reconciliation of this AutomatedClusterDiscovery. type: boolean type: - description: Type is the provider type. + description: Type is the provider type enum: - aks + - eks - capi type: string required: diff --git a/go.mod b/go.mod index 2f91ab9..34f8afb 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.1 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4 v4.3.0 github.com/Azure/go-autorest/autorest v0.11.29 + github.com/aws/aws-sdk-go v1.44.137 github.com/fluxcd/pkg/apis/meta v1.1.2 github.com/fluxcd/pkg/runtime v0.41.0 github.com/google/go-cmp v0.5.9 @@ -60,6 +61,7 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.4 // indirect github.com/imdario/mergo v0.3.13 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kylelemons/godebug v1.1.0 // indirect diff --git a/go.sum b/go.sum index ca879a9..d5b0dd4 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/MakeNowJust/heredoc v1.0.0 h1:cXCdzVdstXyiTqTvfqk9SDHpKNjxuom+DOlyEeQ4pzQ= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/aws/aws-sdk-go v1.44.137 h1:GH2bUPiW7/gHtB04NxQOSOrKqFNjLGKmqt5YaO+K1SE= +github.com/aws/aws-sdk-go v1.44.137/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -160,6 +162,10 @@ github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -294,6 +300,7 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= @@ -324,12 +331,14 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20220526004731-065cf7ba2467/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= @@ -339,6 +348,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= diff --git a/internal/controller/automatedclusterdiscovery_controller.go b/internal/controller/automatedclusterdiscovery_controller.go index 964a6a4..4cff1e6 100644 --- a/internal/controller/automatedclusterdiscovery_controller.go +++ b/internal/controller/automatedclusterdiscovery_controller.go @@ -18,6 +18,7 @@ package controller import ( "context" + "errors" "fmt" "sort" "time" @@ -27,6 +28,9 @@ import ( gitopsv1alpha1 "github.com/weaveworks/cluster-controller/api/v1alpha1" clustersv1alpha1 "github.com/weaveworks/cluster-reflector-controller/api/v1alpha1" "github.com/weaveworks/cluster-reflector-controller/pkg/providers" + "github.com/weaveworks/cluster-reflector-controller/pkg/providers/aws" + "github.com/weaveworks/cluster-reflector-controller/pkg/providers/azure" + "github.com/weaveworks/cluster-reflector-controller/pkg/providers/capi" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -48,14 +52,15 @@ type eventRecorder interface { Event(object runtime.Object, eventtype, reason, message string) } +type providerFactoryFunc func(client.Reader, *clustersv1alpha1.AutomatedClusterDiscovery) (providers.Provider, error) + // AutomatedClusterDiscoveryReconciler reconciles a AutomatedClusterDiscovery object type AutomatedClusterDiscoveryReconciler struct { client.Client Scheme *runtime.Scheme EventRecorder eventRecorder - AKSProvider func(string) providers.Provider - CAPIProvider func(client.Client, string) providers.Provider + ProviderFactory providerFactoryFunc } // event emits a Kubernetes event and forwards the event to the event recorder @@ -89,7 +94,6 @@ func (r *AutomatedClusterDiscoveryReconciler) Reconcile(ctx context.Context, req logger.Info("reconciling cluster reflector", "type", clusterDiscovery.Spec.Type, - "name", clusterDiscovery.Spec.Name, ) // Set the value of the reconciliation request in status. @@ -149,44 +153,22 @@ func (r *AutomatedClusterDiscoveryReconciler) Reconcile(ctx context.Context, req func (r *AutomatedClusterDiscoveryReconciler) reconcileResources(ctx context.Context, cd *clustersv1alpha1.AutomatedClusterDiscovery) ([]clustersv1alpha1.ResourceRef, error) { logger := log.FromContext(ctx) - var err error - clusters, clusterID := []*providers.ProviderCluster{}, "" - - if cd.Spec.Type == "aks" { - logger.Info("reconciling AKS cluster reflector", - "name", cd.Spec.Name, - ) - - azureProvider := r.AKSProvider(cd.Spec.AKS.SubscriptionID) - - // We get the clusters and cluster ID separately so that we can return - // the error from the Reconciler without touching the inventory. - clusters, err = azureProvider.ListClusters(ctx) - if err != nil { - logger.Error(err, "failed to list AKS clusters") - - return nil, err - } - - clusterID, err = azureProvider.ClusterID(ctx, r.Client) - if err != nil { - logger.Error(err, "failed to list get Cluster ID from AKS cluster") - - return nil, err - } - } else if cd.Spec.Type == "capi" { - logger.Info("reconciling CAPI cluster reflector", - "name", cd.Spec.Name, - ) - - capiProvider := r.CAPIProvider(r.Client, cd.Namespace) + provider, err := r.ProviderFactory(r.Client, cd) + if err != nil { + logger.Error(err, "failed to create provider", "type", cd.Spec.Type) + return nil, err + } - clusters, err = capiProvider.ListClusters(ctx) - if err != nil { - logger.Error(err, "failed to list CAPI clusters") - return nil, err - } + clusters, err := provider.ListClusters(ctx) + if err != nil { + logger.Error(err, "failed to list clusters from provider", "type", cd.Spec.Type) + return nil, err + } + clusterID, err := provider.ClusterID(ctx, r.Client) + if err != nil { + logger.Error(err, "failed to get cluster ID", "type", cd.Spec.Type) + return nil, err } // TODO: Fix this so that we record the inventoryRefs even if we get an @@ -194,7 +176,6 @@ func (r *AutomatedClusterDiscoveryReconciler) reconcileResources(ctx context.Con inventoryRefs, err := r.reconcileClusters(ctx, clusters, clusterID, cd) if err != nil { logger.Error(err, "failed to reconcile clusters") - return nil, err } @@ -498,3 +479,27 @@ func mergeMaps[K comparable, V any](maps ...map[K]V) map[K]V { return result } + +// DefaultProviderFactory creates an appropriate factory for creating provider +// clients based on the spec of the AutomatedClusterDiscovery. +func DefaultProviderFactory(k8sClient client.Reader, acd *clustersv1alpha1.AutomatedClusterDiscovery) (providers.Provider, error) { + switch acd.Spec.Type { + case "aks": + if acd.Spec.AKS == nil { + return nil, errors.New("discovery .spec.type = aks but no AKS configuration provided") + } + + return azure.NewAzureProvider(acd.Spec.AKS.SubscriptionID), nil + case "eks": + if acd.Spec.EKS == nil { + return nil, errors.New("discovery .spec.type = eks but no EKS configuration provided") + } + + return aws.NewAWSProvider(acd.Spec.EKS.Region), nil + + case "capi": + return capi.NewCAPIProvider(k8sClient, acd.GetNamespace()), nil + } + + return nil, fmt.Errorf("unknown provider type: %s", acd.Spec.Type) +} diff --git a/internal/controller/automatedclusterdiscovery_controller_test.go b/internal/controller/automatedclusterdiscovery_controller_test.go index 6084a7d..09f7d8a 100644 --- a/internal/controller/automatedclusterdiscovery_controller_test.go +++ b/internal/controller/automatedclusterdiscovery_controller_test.go @@ -2,6 +2,7 @@ package controller import ( "context" + "fmt" "path/filepath" "sort" "testing" @@ -62,19 +63,8 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { assert.NoError(t, err) t.Run("Reconcile with AKS", func(t *testing.T) { - aksCluster := &clustersv1alpha1.AutomatedClusterDiscovery{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-aks", - Namespace: "default", - }, - Spec: clustersv1alpha1.AutomatedClusterDiscoverySpec{ - Type: "aks", - AKS: &clustersv1alpha1.AKS{ - SubscriptionID: "subscription-123", - }, - Interval: metav1.Duration{Duration: time.Minute}, - }, - } + aksCluster := newAutomatedClusterDiscovery("test-aks", + aksProviderOption("subscription-123")) testProvider := stubProvider{ response: []*providers.ProviderCluster{ @@ -94,12 +84,10 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { } reconciler := &AutomatedClusterDiscoveryReconciler{ - Client: k8sClient, - Scheme: scheme, - AKSProvider: func(providerID string) providers.Provider { - return &testProvider - }, - EventRecorder: &mockEventRecorder{}, + Client: k8sClient, + Scheme: scheme, + ProviderFactory: testProviderFactory(&testProvider), + EventRecorder: &mockEventRecorder{}, } assert.NoError(t, reconciler.SetupWithManager(mgr)) @@ -168,24 +156,13 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { "test.example.com/annotation": "test", "example.com/test": "annotation", } - - aksCluster := &clustersv1alpha1.AutomatedClusterDiscovery{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-aks", - Namespace: "default", - }, - Spec: clustersv1alpha1.AutomatedClusterDiscoverySpec{ - Type: "aks", - AKS: &clustersv1alpha1.AKS{ - SubscriptionID: "subscription-123", - }, - Interval: metav1.Duration{Duration: time.Minute}, - CommonLabels: map[string]string{ + aksCluster := newAutomatedClusterDiscovery("test-aks", + aksProviderOption("subscription-123"), + commonLabels( + map[string]string{ "example.com/label": "test", }, - CommonAnnotations: wantAnnotations, - }, - } + ), commonAnnotations(wantAnnotations)) testProvider := stubProvider{ response: []*providers.ProviderCluster{ @@ -205,12 +182,10 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { } reconciler := &AutomatedClusterDiscoveryReconciler{ - Client: k8sClient, - Scheme: scheme, - AKSProvider: func(providerID string) providers.Provider { - return &testProvider - }, - EventRecorder: &mockEventRecorder{}, + Client: k8sClient, + Scheme: scheme, + ProviderFactory: testProviderFactory(&testProvider), + EventRecorder: &mockEventRecorder{}, } assert.NoError(t, reconciler.SetupWithManager(mgr)) @@ -252,19 +227,8 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { }) t.Run("Reconcile with cluster labels applies labels to generated cluster", func(t *testing.T) { - aksCluster := &clustersv1alpha1.AutomatedClusterDiscovery{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-aks", - Namespace: "default", - }, - Spec: clustersv1alpha1.AutomatedClusterDiscoverySpec{ - Type: "aks", - AKS: &clustersv1alpha1.AKS{ - SubscriptionID: "subscription-123", - }, - Interval: metav1.Duration{Duration: time.Minute}, - }, - } + aksCluster := newAutomatedClusterDiscovery("test-aks", + aksProviderOption("subscription-123")) testProvider := stubProvider{ response: []*providers.ProviderCluster{ @@ -287,12 +251,10 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { } reconciler := &AutomatedClusterDiscoveryReconciler{ - Client: k8sClient, - Scheme: scheme, - AKSProvider: func(providerID string) providers.Provider { - return &testProvider - }, - EventRecorder: &mockEventRecorder{}, + Client: k8sClient, + Scheme: scheme, + ProviderFactory: testProviderFactory(&testProvider), + EventRecorder: &mockEventRecorder{}, } assert.NoError(t, reconciler.SetupWithManager(mgr)) @@ -335,20 +297,11 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { }) t.Run("Reconcile with cluster labels does not apply labels to cluster when tags disabled", func(t *testing.T) { - aksCluster := &clustersv1alpha1.AutomatedClusterDiscovery{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-aks-disabled-tags", - Namespace: "default", - }, - Spec: clustersv1alpha1.AutomatedClusterDiscoverySpec{ - Type: "aks", - DisableTags: true, - AKS: &clustersv1alpha1.AKS{ - SubscriptionID: "subscription-123", - }, - Interval: metav1.Duration{Duration: time.Minute}, - }, - } + aksCluster := newAutomatedClusterDiscovery("test-aks-disabled-tags", + aksProviderOption("subscription-123"), + func(a *clustersv1alpha1.AutomatedClusterDiscovery) { + a.Spec.DisableTags = true + }) testProvider := stubProvider{ response: []*providers.ProviderCluster{ @@ -371,12 +324,10 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { } reconciler := &AutomatedClusterDiscoveryReconciler{ - Client: k8sClient, - Scheme: scheme, - AKSProvider: func(providerID string) providers.Provider { - return &testProvider - }, - EventRecorder: &mockEventRecorder{}, + Client: k8sClient, + Scheme: scheme, + ProviderFactory: testProviderFactory(&testProvider), + EventRecorder: &mockEventRecorder{}, } assert.NoError(t, reconciler.SetupWithManager(mgr)) @@ -417,19 +368,8 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { }) t.Run("Reconcile when executing in cluster and cluster matches reflector cluster", func(t *testing.T) { - aksCluster := &clustersv1alpha1.AutomatedClusterDiscovery{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-aks", - Namespace: "default", - }, - Spec: clustersv1alpha1.AutomatedClusterDiscoverySpec{ - Type: "aks", - AKS: &clustersv1alpha1.AKS{ - SubscriptionID: "subscription-123", - }, - Interval: metav1.Duration{Duration: time.Minute}, - }, - } + aksCluster := newAutomatedClusterDiscovery("test-aks-disabled-tags", + aksProviderOption("subscription-123")) testClusterID := "/subscriptions/ace37984-aaaa-1234-1234-a1a12c0ae14b/resourcegroups/team-pesto-use1/providers/Microsoft.ContainerService/managedClusters/test-cluster" testProvider := stubProvider{ @@ -452,12 +392,10 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { } reconciler := &AutomatedClusterDiscoveryReconciler{ - Client: k8sClient, - Scheme: scheme, - AKSProvider: func(providerID string) providers.Provider { - return &testProvider - }, - EventRecorder: &mockEventRecorder{}, + Client: k8sClient, + Scheme: scheme, + ProviderFactory: testProviderFactory(&testProvider), + EventRecorder: &mockEventRecorder{}, } assert.NoError(t, reconciler.SetupWithManager(mgr)) @@ -486,19 +424,8 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { t.Run("Reconcile when cluster has been removed from AKS", func(t *testing.T) { ctx := context.TODO() - aksCluster := &clustersv1alpha1.AutomatedClusterDiscovery{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-aks", - Namespace: "default", - }, - Spec: clustersv1alpha1.AutomatedClusterDiscoverySpec{ - Type: "aks", - AKS: &clustersv1alpha1.AKS{ - SubscriptionID: "subscription-123", - }, - Interval: metav1.Duration{Duration: time.Minute}, - }, - } + aksCluster := newAutomatedClusterDiscovery("test-aks-disabled-tags", + aksProviderOption("subscription-123")) err := k8sClient.Create(ctx, aksCluster) assert.NoError(t, err) @@ -526,12 +453,10 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { } reconciler := &AutomatedClusterDiscoveryReconciler{ - Client: k8sClient, - Scheme: scheme, - AKSProvider: func(providerID string) providers.Provider { - return &testProvider - }, - EventRecorder: &mockEventRecorder{}, + Client: k8sClient, + Scheme: scheme, + ProviderFactory: testProviderFactory(&testProvider), + EventRecorder: &mockEventRecorder{}, } assert.NoError(t, reconciler.SetupWithManager(mgr)) @@ -556,19 +481,8 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { t.Run("Reconcile updates Secret value for existing clusters", func(t *testing.T) { ctx := context.TODO() - aksCluster := &clustersv1alpha1.AutomatedClusterDiscovery{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-aks", - Namespace: "default", - }, - Spec: clustersv1alpha1.AutomatedClusterDiscoverySpec{ - Type: "aks", - AKS: &clustersv1alpha1.AKS{ - SubscriptionID: "subscription-123", - }, - Interval: metav1.Duration{Duration: time.Minute}, - }, - } + aksCluster := newAutomatedClusterDiscovery("test-aks-disabled-tags", + aksProviderOption("subscription-123")) err := k8sClient.Create(ctx, aksCluster) assert.NoError(t, err) @@ -598,12 +512,10 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { } reconciler := &AutomatedClusterDiscoveryReconciler{ - Client: k8sClient, - Scheme: scheme, - AKSProvider: func(providerID string) providers.Provider { - return &testProvider - }, - EventRecorder: &mockEventRecorder{}, + Client: k8sClient, + Scheme: scheme, + ProviderFactory: testProviderFactory(&testProvider), + EventRecorder: &mockEventRecorder{}, } assert.NoError(t, reconciler.SetupWithManager(mgr)) @@ -632,20 +544,11 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { t.Run("Reconcile suspended cluster discovery resource", func(t *testing.T) { ctx := context.TODO() - aksCluster := &clustersv1alpha1.AutomatedClusterDiscovery{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-aks", - Namespace: "default", - }, - Spec: clustersv1alpha1.AutomatedClusterDiscoverySpec{ - Type: "aks", - AKS: &clustersv1alpha1.AKS{ - SubscriptionID: "subscription-123", - }, - Interval: metav1.Duration{Duration: time.Minute}, - Suspend: true, - }, - } + aksCluster := newAutomatedClusterDiscovery("test-aks-disabled-tags", + aksProviderOption("subscription-123"), + func(a *clustersv1alpha1.AutomatedClusterDiscovery) { + a.Spec.Suspend = true + }) testProvider := stubProvider{ response: []*providers.ProviderCluster{ @@ -665,12 +568,10 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { } reconciler := &AutomatedClusterDiscoveryReconciler{ - Client: k8sClient, - Scheme: scheme, - AKSProvider: func(providerID string) providers.Provider { - return &testProvider - }, - EventRecorder: &mockEventRecorder{}, + Client: k8sClient, + Scheme: scheme, + ProviderFactory: testProviderFactory(&testProvider), + EventRecorder: &mockEventRecorder{}, } assert.NoError(t, reconciler.SetupWithManager(mgr)) @@ -730,12 +631,10 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { } reconciler := &AutomatedClusterDiscoveryReconciler{ - Client: k8sClient, - Scheme: scheme, - AKSProvider: func(providerID string) providers.Provider { - return &testProvider - }, - EventRecorder: &mockEventRecorder{}, + Client: k8sClient, + Scheme: scheme, + ProviderFactory: testProviderFactory(&testProvider), + EventRecorder: &mockEventRecorder{}, } assert.NoError(t, reconciler.SetupWithManager(mgr)) @@ -798,11 +697,9 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { } reconciler := &AutomatedClusterDiscoveryReconciler{ - Client: k8sClient, - Scheme: scheme, - AKSProvider: func(providerID string) providers.Provider { - return &testProvider - }, + Client: k8sClient, + Scheme: scheme, + ProviderFactory: testProviderFactory(&testProvider), } assert.NoError(t, reconciler.SetupWithManager(mgr)) @@ -869,12 +766,10 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { mockEventRecorder := &mockEventRecorder{} reconciler := &AutomatedClusterDiscoveryReconciler{ - Client: k8sClient, - Scheme: scheme, - AKSProvider: func(providerID string) providers.Provider { - return &testProvider - }, - EventRecorder: mockEventRecorder, + Client: k8sClient, + Scheme: scheme, + ProviderFactory: testProviderFactory(&testProvider), + EventRecorder: mockEventRecorder, } assert.NoError(t, reconciler.SetupWithManager(mgr)) @@ -940,12 +835,10 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { } reconciler := &AutomatedClusterDiscoveryReconciler{ - Client: k8sClient, - Scheme: scheme, - CAPIProvider: func(capiclient client.Client, namespace string) providers.Provider { - return &testProvider - }, - EventRecorder: &mockEventRecorder{}, + Client: k8sClient, + Scheme: scheme, + ProviderFactory: testProviderFactory(&testProvider), + EventRecorder: &mockEventRecorder{}, } assert.NoError(t, reconciler.SetupWithManager(mgr)) @@ -1009,6 +902,40 @@ func TestAutomatedClusterDiscoveryReconciler(t *testing.T) { assertHasOwnerReference(t, secret, clusterRef) }) + t.Run("Reconcile with missing configuration for type", func(t *testing.T) { + reconciler := &AutomatedClusterDiscoveryReconciler{ + Client: k8sClient, + Scheme: scheme, + ProviderFactory: DefaultProviderFactory, + EventRecorder: &mockEventRecorder{}, + } + assert.NoError(t, reconciler.SetupWithManager(mgr)) + + typeTests := []struct { + discoveryType string + wantErr string + }{ + {"aks", "discovery .spec.type = aks but no AKS configuration provided"}, + {"eks", "discovery .spec.type = eks but no EKS configuration provided"}, + } + + for _, tt := range typeTests { + t.Run(fmt.Sprintf("type %s", tt.discoveryType), func(t *testing.T) { + aksCluster := newAutomatedClusterDiscovery("test-aks", + func(a *clustersv1alpha1.AutomatedClusterDiscovery) { + a.Spec.Type = tt.discoveryType + }) + + ctx := context.TODO() + err = k8sClient.Create(ctx, aksCluster) + assert.NoError(t, err) + defer deleteClusterDiscoveryAndInventory(t, k8sClient, aksCluster) + + _, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: client.ObjectKeyFromObject(aksCluster)}) + assert.ErrorContains(t, err, tt.wantErr) + }) + } + }) } func TestReconcilingWithAnnotationChange(t *testing.T) { @@ -1060,12 +987,10 @@ func TestReconcilingWithAnnotationChange(t *testing.T) { defer deleteClusterDiscoveryAndInventory(t, k8sClient, aksCluster) reconciler := &AutomatedClusterDiscoveryReconciler{ - Client: k8sClient, - Scheme: scheme, - AKSProvider: func(providerID string) providers.Provider { - return &stubProvider{} - }, - EventRecorder: &mockEventRecorder{}, + Client: k8sClient, + Scheme: scheme, + ProviderFactory: testProviderFactory(&stubProvider{}), + EventRecorder: &mockEventRecorder{}, } assert.NoError(t, reconciler.SetupWithManager(mgr)) @@ -1236,3 +1161,48 @@ func isOwnerReferenceEqual(a, b metav1.OwnerReference) bool { (a.Name == b.Name) && (a.UID == b.UID) } + +func aksProviderOption(subscriptionID string) func(*clustersv1alpha1.AutomatedClusterDiscovery) { + return func(acd *clustersv1alpha1.AutomatedClusterDiscovery) { + acd.Spec.Type = "aks" + acd.Spec.AKS = &clustersv1alpha1.AKS{ + SubscriptionID: subscriptionID, + } + } +} + +func commonLabels(labels map[string]string) func(*clustersv1alpha1.AutomatedClusterDiscovery) { + return func(acd *clustersv1alpha1.AutomatedClusterDiscovery) { + acd.Spec.CommonLabels = labels + } +} + +func commonAnnotations(annotations map[string]string) func(*clustersv1alpha1.AutomatedClusterDiscovery) { + return func(acd *clustersv1alpha1.AutomatedClusterDiscovery) { + acd.Spec.CommonAnnotations = annotations + } +} + +func newAutomatedClusterDiscovery(name string, opts ...func(*clustersv1alpha1.AutomatedClusterDiscovery)) *clustersv1alpha1.AutomatedClusterDiscovery { + discovery := &clustersv1alpha1.AutomatedClusterDiscovery{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + }, + Spec: clustersv1alpha1.AutomatedClusterDiscoverySpec{ + Interval: metav1.Duration{Duration: time.Minute}, + }, + } + + for _, opt := range opts { + opt(discovery) + } + + return discovery +} + +func testProviderFactory(tp *stubProvider) providerFactoryFunc { + return func(client.Reader, *clustersv1alpha1.AutomatedClusterDiscovery) (providers.Provider, error) { + return tp, nil + } +} diff --git a/main.go b/main.go index 3c16703..4e7b226 100644 --- a/main.go +++ b/main.go @@ -29,16 +29,13 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" gitopsv1alpha1 "github.com/weaveworks/cluster-controller/api/v1alpha1" clustersv1alpha1 "github.com/weaveworks/cluster-reflector-controller/api/v1alpha1" "github.com/weaveworks/cluster-reflector-controller/internal/controller" - "github.com/weaveworks/cluster-reflector-controller/pkg/providers" - "github.com/weaveworks/cluster-reflector-controller/pkg/providers/azure" - "github.com/weaveworks/cluster-reflector-controller/pkg/providers/capi" + capiclusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" //+kubebuilder:scaffold:imports ) @@ -108,15 +105,10 @@ func main() { } if err = (&controller.AutomatedClusterDiscoveryReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - EventRecorder: eventRecorder, - AKSProvider: func(subscriptionID string) providers.Provider { - return azure.NewAzureProvider(subscriptionID) - }, - CAPIProvider: func(kubeclient client.Client, namespace string) providers.Provider { - return capi.NewCAPIProvider(kubeclient, namespace) - }, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + EventRecorder: eventRecorder, + ProviderFactory: controller.DefaultProviderFactory, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "AutomatedClusterDiscovery") os.Exit(1) diff --git a/pkg/providers/aws/aws.go b/pkg/providers/aws/aws.go new file mode 100644 index 0000000..5430fb2 --- /dev/null +++ b/pkg/providers/aws/aws.go @@ -0,0 +1,193 @@ +package aws + +import ( + "context" + "encoding/base64" + "fmt" + "time" + + awssdk "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/eks" + "github.com/aws/aws-sdk-go/service/sts" + "github.com/weaveworks/cluster-reflector-controller/pkg/providers" + corev1 "k8s.io/api/core/v1" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const nodeClusterNameLabel = "alpha.eksctl.io/cluster-name" + +// eksAPI is the parts of the EKS API we're interested in. +// Used for listing clusters and getting their cert info. +type eksAPI interface { + ListClusters(input *eks.ListClustersInput) (*eks.ListClustersOutput, error) + DescribeCluster(input *eks.DescribeClusterInput) (*eks.DescribeClusterOutput, error) +} + +// stsAPI is the parts of the STS API we're interested in. +// Used for generating tokens +type stsAPI interface { + GetCallerIdentityRequest(input *sts.GetCallerIdentityInput) (req *request.Request, output *sts.GetCallerIdentityOutput) +} + +// awsAPIs is the parts of the AWS API we're interested in. +type awsAPIs struct { + eksAPI + stsAPI +} + +// AWSProvider queries all EKS clusters for the provided AWS region and +// returns the clusters and kubeconfigs for the clusters. +type AWSProvider struct { + Region string + ClientFactory func(string) (*awsAPIs, error) +} + +// NewAWSProvider creates and returns an AWSProvider ready for use. +func NewAWSProvider(region string) *AWSProvider { + return &AWSProvider{ + Region: region, + ClientFactory: clientFactory, + } +} + +// ListClusters returns a list of clusters and kubeconfigs for the clusters. +func (p *AWSProvider) ListClusters(ctx context.Context) ([]*providers.ProviderCluster, error) { + client, err := p.ClientFactory(p.Region) + if err != nil { + return nil, fmt.Errorf("failed to create client: %v", err) + } + + clusters := []*providers.ProviderCluster{} + input := &eks.ListClustersInput{} + output, err := client.ListClusters(input) + if err != nil { + return nil, fmt.Errorf("failed to list EKS clusters: %v", err) + } + + for _, clusterName := range output.Clusters { + kubeConfig, err := getKubeconfigForCluster(ctx, client, *clusterName) + if err != nil { + return nil, fmt.Errorf("failed to get kubeconfig for cluster %s: %v", *clusterName, err) + } + + clusters = append(clusters, &providers.ProviderCluster{ + Name: *clusterName, + ID: *clusterName, + KubeConfig: kubeConfig, + }) + } + + return clusters, nil +} + +// ClusterID returns the ID of the cluster with the provided name. +func (p *AWSProvider) ClusterID(ctx context.Context, kubeClient client.Reader) (string, error) { + nodes := &corev1.NodeList{} + if err := kubeClient.List(ctx, nodes); err != nil { + return "", fmt.Errorf("failed to list nodes: %v", err) + } + + if len(nodes.Items) == 0 { + return "", fmt.Errorf("no nodes found") + } + + node := nodes.Items[0] + if node.Labels != nil { + if clusterID, ok := node.Labels[nodeClusterNameLabel]; ok { + return clusterID, nil + } + } + + return "", nil +} + +func getKubeconfigForCluster(ctx context.Context, client *awsAPIs, clusterName string) (*clientcmdapi.Config, error) { + describeInput := &eks.DescribeClusterInput{ + Name: awssdk.String(clusterName), + } + + describeOutput, err := client.eksAPI.DescribeCluster(describeInput) + if err != nil { + return nil, fmt.Errorf("failed to describe EKS cluster %s: %v", clusterName, err) + } + + kubeConfig, err := generateKubeconfig(describeOutput.Cluster, client.stsAPI, clusterName) + if err != nil { + return nil, fmt.Errorf("failed to generate kubeconfig for cluster %s: %v", clusterName, err) + } + + return kubeConfig, nil +} + +func generateKubeconfig(cluster *eks.Cluster, stsClient stsAPI, clusterName string) (*clientcmdapi.Config, error) { + kubeconfig := clientcmdapi.NewConfig() + + certData, err := base64.StdEncoding.DecodeString(*cluster.CertificateAuthority.Data) + if err != nil { + return nil, fmt.Errorf("decoding cluster CA cert: %w", err) + } + + kubeconfig.Clusters[clusterName] = &clientcmdapi.Cluster{ + Server: *cluster.Endpoint, + CertificateAuthorityData: certData, + } + + kubeconfig.Contexts[clusterName] = &clientcmdapi.Context{ + Cluster: clusterName, + AuthInfo: clusterName, + } + + kubeconfig.CurrentContext = clusterName + + token, err := generateToken(stsClient, clusterName) + if err != nil { + return nil, fmt.Errorf("generating presigned token: %w", err) + } + + kubeconfig.AuthInfos[clusterName] = &clientcmdapi.AuthInfo{ + Token: token, + } + + return kubeconfig, nil +} + +const ( + tokenPrefix = "k8s-aws-v1." + tokenAgeMins = 15 + clusterNameHeader = "x-k8s-aws-id" +) + +func generateToken(stsClient stsAPI, clusterName string) (string, error) { + req, _ := stsClient.GetCallerIdentityRequest(&sts.GetCallerIdentityInput{}) + req.HTTPRequest.Header.Add(clusterNameHeader, clusterName) + + presignedURL, err := req.Presign(tokenAgeMins * time.Minute) + if err != nil { + return "", fmt.Errorf("presigning AWS get caller identity: %w", err) + } + + encodedURL := base64.RawURLEncoding.EncodeToString([]byte(presignedURL)) + return fmt.Sprintf("%s%s", tokenPrefix, encodedURL), nil +} + +// this is the default client factory which just creates a set of +// aws apis +func clientFactory(region string) (*awsAPIs, error) { + // Don't specify any credentials here, let the SDK take care of it. + awsConfig := &awssdk.Config{ + Region: awssdk.String(region), + } + + session, err := session.NewSession(awsConfig) + if err != nil { + return nil, fmt.Errorf("failed to create session: %v", err) + } + + return &awsAPIs{ + eksAPI: eks.New(session), + stsAPI: sts.New(session), + }, nil +} diff --git a/pkg/providers/aws/aws_test.go b/pkg/providers/aws/aws_test.go new file mode 100644 index 0000000..056cae9 --- /dev/null +++ b/pkg/providers/aws/aws_test.go @@ -0,0 +1,201 @@ +package aws + +import ( + "context" + "encoding/base64" + "fmt" + "net/http" + "net/url" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/eks" + "github.com/aws/aws-sdk-go/service/sts" + "github.com/stretchr/testify/assert" + "github.com/weaveworks/cluster-reflector-controller/pkg/providers" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +var _ providers.Provider = (*AWSProvider)(nil) + +func TestAWSProviderListClusters(t *testing.T) { + provider := AWSProvider{ + ClientFactory: func(region string) (*awsAPIs, error) { + return &awsAPIs{eksAPI: &mockEKSAPI{}, stsAPI: &mockSTSAPI{}}, nil + }, + } + + clusters, err := provider.ListClusters(context.Background()) + assert.NoError(t, err) + assert.Len(t, clusters, 2) + expected := []*providers.ProviderCluster{ + { + Name: "cluster-1", + ID: "cluster-1", + KubeConfig: &clientcmdapi.Config{ + Extensions: map[string]runtime.Object{}, + Preferences: clientcmdapi.Preferences{ + Extensions: map[string]runtime.Object{}, + }, + CurrentContext: "cluster-1", + Clusters: map[string]*clientcmdapi.Cluster{ + "cluster-1": { + Server: "https://cluster-1-Id.us-west-2.eks.amazonaws.com", + CertificateAuthorityData: []byte("certificate-data"), + }, + }, + AuthInfos: map[string]*clientcmdapi.AuthInfo{ + "cluster-1": { + Token: "k8s-aws-v1.", + }, + }, + Contexts: map[string]*clientcmdapi.Context{ + "cluster-1": { + Cluster: "cluster-1", + AuthInfo: "cluster-1", + }, + }, + }, + }, + { + Name: "cluster-2", + ID: "cluster-2", + KubeConfig: &clientcmdapi.Config{ + Extensions: map[string]runtime.Object{}, + Preferences: clientcmdapi.Preferences{ + Extensions: map[string]runtime.Object{}, + }, + Clusters: map[string]*clientcmdapi.Cluster{ + "cluster-2": { + Server: "https://cluster-2-Id.us-west-2.eks.amazonaws.com", + CertificateAuthorityData: []byte("certificate-data"), + }, + }, + AuthInfos: map[string]*clientcmdapi.AuthInfo{ + "cluster-2": { + Token: "k8s-aws-v1.", + }, + }, + CurrentContext: "cluster-2", + Contexts: map[string]*clientcmdapi.Context{ + "cluster-2": { + Cluster: "cluster-2", + AuthInfo: "cluster-2", + }, + }, + }, + }, + } + + assert.Equal(t, expected, clusters) + +} + +func TestAWSProvider_ClusterID(t *testing.T) { + clusterIDTests := []struct { + name string + objs []client.Object + want string + }{ + { + name: "Label exists", + objs: []client.Object{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ip-192-168-111-24.eu-north-1.compute.internal", + Labels: map[string]string{ + "alpha.eksctl.io/cluster-name": "cluster-1", + }, + }, + }, + }, + want: "cluster-1", + }, + { + name: "missing label", + objs: []client.Object{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ip-192-168-111-24.eu-north-1.compute.internal", + }, + }, + }, + want: "", + }, + } + + for _, tt := range clusterIDTests { + t.Run(tt.name, func(t *testing.T) { + fc := fake.NewClientBuilder().WithObjects(tt.objs...).Build() + + provider := AWSProvider{ + ClientFactory: func(region string) (*awsAPIs, error) { + return &awsAPIs{eksAPI: &mockEKSAPI{}, stsAPI: &mockSTSAPI{}}, nil + }, + } + + clusterID, err := provider.ClusterID(context.TODO(), fc) + if err != nil { + t.Fatal(err) + } + + if clusterID != tt.want { + t.Fatalf("ClusterID() got %s, want %s", clusterID, tt.want) + } + }) + } +} + +// mocks and that + +type mockEKSAPI struct{} + +func (m *mockEKSAPI) ListClusters(input *eks.ListClustersInput) (*eks.ListClustersOutput, error) { + return &eks.ListClustersOutput{ + Clusters: aws.StringSlice([]string{"cluster-1", "cluster-2"}), + }, nil +} + +func describeClusterOutput(name string) *eks.DescribeClusterOutput { + // base64 encoded certificate data + certData := base64.StdEncoding.EncodeToString([]byte("certificate-data")) + return &eks.DescribeClusterOutput{ + Cluster: &eks.Cluster{ + Name: aws.String(name), + Id: aws.String(fmt.Sprintf("%s-Id", name)), + Endpoint: aws.String(fmt.Sprintf("https://%s-Id.us-west-2.eks.amazonaws.com", name)), + CertificateAuthority: &eks.Certificate{ + Data: aws.String(certData), + }, + }, + } +} + +func (m *mockEKSAPI) DescribeCluster(input *eks.DescribeClusterInput) (*eks.DescribeClusterOutput, error) { + if *input.Name == "cluster-1" { + return describeClusterOutput("cluster-1"), nil + } else if *input.Name == "cluster-2" { + return describeClusterOutput("cluster-2"), nil + } + + return nil, fmt.Errorf("cluster not found: %s", *input.Name) +} + +type mockSTSAPI struct{} + +func (m *mockSTSAPI) GetCallerIdentityRequest(input *sts.GetCallerIdentityInput) (*request.Request, *sts.GetCallerIdentityOutput) { + return &request.Request{ + HTTPRequest: &http.Request{ + Header: http.Header{}, + URL: &url.URL{}, + }, + Handlers: request.Handlers{}, + Operation: &request.Operation{}, + }, nil +} diff --git a/pkg/providers/capi/capi.go b/pkg/providers/capi/capi.go index a25bf2e..1149187 100644 --- a/pkg/providers/capi/capi.go +++ b/pkg/providers/capi/capi.go @@ -9,14 +9,14 @@ import ( ) type CAPIProvider struct { - Kubeclient client.Client + Kubeclient client.Reader Namespace string } var _ providers.Provider = (*CAPIProvider)(nil) // NewCAPIProvider creates and returns a CAPIProvider ready for use -func NewCAPIProvider(client client.Client, namespace string) *CAPIProvider { +func NewCAPIProvider(client client.Reader, namespace string) *CAPIProvider { provider := &CAPIProvider{ Kubeclient: client, Namespace: namespace,