From 18a5a0448596cca9830c5823a394028f2d5e3ad0 Mon Sep 17 00:00:00 2001 From: Fabrizio Pandini Date: Fri, 28 Jun 2024 14:47:08 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=8C=B1=20Ensure=20CRS=20controller=20alwa?= =?UTF-8?q?ys=20add=20ownerReference=20to=20resources=20(#10756)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Ensure CRS controller always add ownerReference to resources * Address comments * Fix race conditions * Address comments --- .../clusterresourceset_controller.go | 81 +++++++++++-------- .../clusterresourceset_controller_test.go | 51 +++++++++--- 2 files changed, 89 insertions(+), 43 deletions(-) diff --git a/exp/addons/internal/controllers/clusterresourceset_controller.go b/exp/addons/internal/controllers/clusterresourceset_controller.go index 826cd3c8e68b..51cfdc4cc535 100644 --- a/exp/addons/internal/controllers/clusterresourceset_controller.go +++ b/exp/addons/internal/controllers/clusterresourceset_controller.go @@ -276,17 +276,38 @@ func (r *ClusterResourceSetReconciler) ApplyClusterResourceSet(ctx context.Conte log := ctrl.LoggerFrom(ctx, "Cluster", klog.KObj(cluster)) ctx = ctrl.LoggerInto(ctx, log) - remoteClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster)) - if err != nil { - conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.RemoteClusterClientFailedReason, clusterv1.ConditionSeverityError, err.Error()) - return err - } + // Iterate all resources and ensure an ownerReference to the clusterResourceSet is on the resource. + // NOTE: we have to do this before getting a remote client, otherwise owner reference won't be created until it is + // possible to connect to the remote cluster. + errList := []error{} + objList := make([]*unstructured.Unstructured, len(clusterResourceSet.Spec.Resources)) + for i, resource := range clusterResourceSet.Spec.Resources { + unstructuredObj, err := r.getResource(ctx, resource, cluster.GetNamespace()) + if err != nil { + if err == ErrSecretTypeNotSupported { + conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.WrongSecretTypeReason, clusterv1.ConditionSeverityWarning, err.Error()) + } else { + conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.RetrievingResourceFailedReason, clusterv1.ConditionSeverityWarning, err.Error()) - // Ensure that the Kubernetes API Server service has been created in the remote cluster before applying the ClusterResourceSet to avoid service IP conflict. - // This action is required when the remote cluster Kubernetes version is lower than v1.25. - // TODO: Remove this action once CAPI no longer supports Kubernetes versions below v1.25. See: https://github.com/kubernetes-sigs/cluster-api/issues/7804 - if err = ensureKubernetesServiceCreated(ctx, remoteClient); err != nil { - return errors.Wrapf(err, "failed to retrieve the Service for Kubernetes API Server of the cluster %s/%s", cluster.Namespace, cluster.Name) + // Continue without adding the error to the aggregate if we can't find the resource. + if apierrors.IsNotFound(err) { + continue + } + } + errList = append(errList, err) + continue + } + + // Ensure an ownerReference to the clusterResourceSet is on the resource. + if err := r.ensureResourceOwnerRef(ctx, clusterResourceSet, unstructuredObj); err != nil { + log.Error(err, "Failed to add ClusterResourceSet as resource owner reference", + "Resource type", unstructuredObj.GetKind(), "Resource name", unstructuredObj.GetName()) + errList = append(errList, err) + } + objList[i] = unstructuredObj + } + if len(errList) > 0 { + return kerrors.NewAggregate(errList) } // Get ClusterResourceSetBinding object for the cluster. @@ -313,32 +334,28 @@ func (r *ClusterResourceSetReconciler) ApplyClusterResourceSet(ctx context.Conte Name: clusterResourceSet.Name, UID: clusterResourceSet.UID, })) - var errList []error + resourceSetBinding := clusterResourceSetBinding.GetOrCreateBinding(clusterResourceSet) - // Iterate all resources and apply them to the cluster and update the resource status in the ClusterResourceSetBinding object. - for _, resource := range clusterResourceSet.Spec.Resources { - unstructuredObj, err := r.getResource(ctx, resource, cluster.GetNamespace()) - if err != nil { - if err == ErrSecretTypeNotSupported { - conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.WrongSecretTypeReason, clusterv1.ConditionSeverityWarning, err.Error()) - } else { - conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.RetrievingResourceFailedReason, clusterv1.ConditionSeverityWarning, err.Error()) + remoteClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster)) + if err != nil { + conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.RemoteClusterClientFailedReason, clusterv1.ConditionSeverityError, err.Error()) + return err + } - // Continue without adding the error to the aggregate if we can't find the resource. - if apierrors.IsNotFound(err) { - continue - } - } - errList = append(errList, err) - continue - } + // Ensure that the Kubernetes API Server service has been created in the remote cluster before applying the ClusterResourceSet to avoid service IP conflict. + // This action is required when the remote cluster Kubernetes version is lower than v1.25. + // TODO: Remove this action once CAPI no longer supports Kubernetes versions below v1.25. See: https://github.com/kubernetes-sigs/cluster-api/issues/7804 + if err := ensureKubernetesServiceCreated(ctx, remoteClient); err != nil { + return errors.Wrapf(err, "failed to retrieve the Service for Kubernetes API Server of the cluster %s/%s", cluster.Namespace, cluster.Name) + } - // Ensure an ownerReference to the clusterResourceSet is on the resource. - if err := r.ensureResourceOwnerRef(ctx, clusterResourceSet, unstructuredObj); err != nil { - log.Error(err, "Failed to add ClusterResourceSet as resource owner reference", - "Resource type", unstructuredObj.GetKind(), "Resource name", unstructuredObj.GetName()) - errList = append(errList, err) + // Iterate all resources and apply them to the cluster and update the resource status in the ClusterResourceSetBinding object. + for i, resource := range clusterResourceSet.Spec.Resources { + unstructuredObj := objList[i] + if unstructuredObj == nil { + // Continue without adding the error to the aggregate if we can't find the resource. + continue } resourceScope, err := reconcileScopeForResource(clusterResourceSet, resource, resourceSetBinding, unstructuredObj) diff --git a/exp/addons/internal/controllers/clusterresourceset_controller_test.go b/exp/addons/internal/controllers/clusterresourceset_controller_test.go index a6b1436f4d45..76a7400dd977 100644 --- a/exp/addons/internal/controllers/clusterresourceset_controller_test.go +++ b/exp/addons/internal/controllers/clusterresourceset_controller_test.go @@ -345,8 +345,13 @@ metadata: } g.Eventually(func() bool { m := &corev1.ConfigMap{} - err := env.Get(ctx, cmKey, m) - return err == nil + if err := env.Get(ctx, cmKey, m); err != nil { + return false + } + if len(m.OwnerReferences) != 1 || m.OwnerReferences[0].Name != crsInstance.Name { + return false + } + return true }, timeout).Should(BeTrue()) // When the ConfigMap resource is created, CRS should get reconciled immediately. @@ -445,8 +450,13 @@ metadata: } g.Eventually(func() bool { m := &corev1.Secret{} - err := env.Get(ctx, cmKey, m) - return err == nil + if err := env.Get(ctx, cmKey, m); err != nil { + return false + } + if len(m.OwnerReferences) != 1 || m.OwnerReferences[0].Name != crsInstance.Name { + return false + } + return true }, timeout).Should(BeTrue()) // When the Secret resource is created, CRS should get reconciled immediately. @@ -911,7 +921,7 @@ metadata: g.Expect(env.Delete(ctx, missingNs)).To(Succeed()) }) - t.Run("Should only create ClusterResourceSetBinding after the remote cluster's Kubernetes API Server Service has been created", func(t *testing.T) { + t.Run("Should only apply resources after the remote cluster's Kubernetes API Server Service has been created", func(t *testing.T) { g := NewWithT(t) ns := setup(t, g) defer teardown(t, g, ns) @@ -962,6 +972,7 @@ metadata: ClusterSelector: metav1.LabelSelector{ MatchLabels: labels, }, + Resources: []addonsv1.ResourceRef{{Name: secretName, Kind: "Secret"}}, }, } // Create the ClusterResourceSet. @@ -970,13 +981,22 @@ metadata: testCluster.SetLabels(labels) g.Expect(env.Update(ctx, testCluster)).To(Succeed()) - // ClusterResourceSetBinding for the Cluster is not created because the Kubernetes API Server Service doesn't exist. + // Resources are not applied because the Kubernetes API Server Service doesn't exist. clusterResourceSetBindingKey := client.ObjectKey{Namespace: testCluster.Namespace, Name: testCluster.Name} g.Consistently(func() bool { binding := &addonsv1.ClusterResourceSetBinding{} - err := env.Get(ctx, clusterResourceSetBindingKey, binding) - return apierrors.IsNotFound(err) + if err := env.Get(ctx, clusterResourceSetBindingKey, binding); err != nil { + // either the binding is not there + return true + } + // or the binding is there but resources are not applied + for _, b := range binding.Spec.Bindings { + if len(b.Resources) > 0 { + return false + } + } + return true }, timeout).Should(BeTrue()) t.Log("Create Kubernetes API Server Service") @@ -990,10 +1010,19 @@ metadata: g.Expect(env.Patch(ctx, clusterResourceSetInstance, client.MergeFrom(clusterResourceSetInstance.DeepCopy()))).To(Succeed()) // Wait until ClusterResourceSetBinding is created for the Cluster - g.Eventually(func(g Gomega) { + g.Eventually(func() bool { + // the binding must exists and track resource being applied binding := &addonsv1.ClusterResourceSetBinding{} - g.Expect(env.Get(ctx, clusterResourceSetBindingKey, binding)).Should(Succeed()) - }, timeout).Should(Succeed()) + if err := env.Get(ctx, clusterResourceSetBindingKey, binding); err != nil { + return false + } + for _, b := range binding.Spec.Bindings { + if len(b.Resources) == 0 { + return false + } + } + return len(binding.Spec.Bindings) != 0 + }, timeout).Should(BeTrue()) }) t.Run("Should handle applying multiple ClusterResourceSets concurrently to the same cluster", func(t *testing.T) {