Skip to content

Commit

Permalink
🌱 Ensure CRS controller always add ownerReference to resources (#10756)
Browse files Browse the repository at this point in the history
* Ensure CRS controller always add ownerReference to resources

* Address comments

* Fix race conditions

* Address comments
  • Loading branch information
fabriziopandini authored Jun 28, 2024
1 parent 72a3520 commit 18a5a04
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 43 deletions.
81 changes: 49 additions & 32 deletions exp/addons/internal/controllers/clusterresourceset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,17 +276,38 @@ func (r *ClusterResourceSetReconciler) ApplyClusterResourceSet(ctx context.Conte
log := ctrl.LoggerFrom(ctx, "Cluster", klog.KObj(cluster))
ctx = ctrl.LoggerInto(ctx, log)

remoteClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster))
if err != nil {
conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.RemoteClusterClientFailedReason, clusterv1.ConditionSeverityError, err.Error())
return err
}
// Iterate all resources and ensure an ownerReference to the clusterResourceSet is on the resource.
// NOTE: we have to do this before getting a remote client, otherwise owner reference won't be created until it is
// possible to connect to the remote cluster.
errList := []error{}
objList := make([]*unstructured.Unstructured, len(clusterResourceSet.Spec.Resources))
for i, resource := range clusterResourceSet.Spec.Resources {
unstructuredObj, err := r.getResource(ctx, resource, cluster.GetNamespace())
if err != nil {
if err == ErrSecretTypeNotSupported {
conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.WrongSecretTypeReason, clusterv1.ConditionSeverityWarning, err.Error())
} else {
conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.RetrievingResourceFailedReason, clusterv1.ConditionSeverityWarning, err.Error())

// Ensure that the Kubernetes API Server service has been created in the remote cluster before applying the ClusterResourceSet to avoid service IP conflict.
// This action is required when the remote cluster Kubernetes version is lower than v1.25.
// TODO: Remove this action once CAPI no longer supports Kubernetes versions below v1.25. See: https://github.com/kubernetes-sigs/cluster-api/issues/7804
if err = ensureKubernetesServiceCreated(ctx, remoteClient); err != nil {
return errors.Wrapf(err, "failed to retrieve the Service for Kubernetes API Server of the cluster %s/%s", cluster.Namespace, cluster.Name)
// Continue without adding the error to the aggregate if we can't find the resource.
if apierrors.IsNotFound(err) {
continue
}
}
errList = append(errList, err)
continue
}

// Ensure an ownerReference to the clusterResourceSet is on the resource.
if err := r.ensureResourceOwnerRef(ctx, clusterResourceSet, unstructuredObj); err != nil {
log.Error(err, "Failed to add ClusterResourceSet as resource owner reference",
"Resource type", unstructuredObj.GetKind(), "Resource name", unstructuredObj.GetName())
errList = append(errList, err)
}
objList[i] = unstructuredObj
}
if len(errList) > 0 {
return kerrors.NewAggregate(errList)
}

// Get ClusterResourceSetBinding object for the cluster.
Expand All @@ -313,32 +334,28 @@ func (r *ClusterResourceSetReconciler) ApplyClusterResourceSet(ctx context.Conte
Name: clusterResourceSet.Name,
UID: clusterResourceSet.UID,
}))
var errList []error

resourceSetBinding := clusterResourceSetBinding.GetOrCreateBinding(clusterResourceSet)

// Iterate all resources and apply them to the cluster and update the resource status in the ClusterResourceSetBinding object.
for _, resource := range clusterResourceSet.Spec.Resources {
unstructuredObj, err := r.getResource(ctx, resource, cluster.GetNamespace())
if err != nil {
if err == ErrSecretTypeNotSupported {
conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.WrongSecretTypeReason, clusterv1.ConditionSeverityWarning, err.Error())
} else {
conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.RetrievingResourceFailedReason, clusterv1.ConditionSeverityWarning, err.Error())
remoteClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster))
if err != nil {
conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.RemoteClusterClientFailedReason, clusterv1.ConditionSeverityError, err.Error())
return err
}

// Continue without adding the error to the aggregate if we can't find the resource.
if apierrors.IsNotFound(err) {
continue
}
}
errList = append(errList, err)
continue
}
// Ensure that the Kubernetes API Server service has been created in the remote cluster before applying the ClusterResourceSet to avoid service IP conflict.
// This action is required when the remote cluster Kubernetes version is lower than v1.25.
// TODO: Remove this action once CAPI no longer supports Kubernetes versions below v1.25. See: https://github.com/kubernetes-sigs/cluster-api/issues/7804
if err := ensureKubernetesServiceCreated(ctx, remoteClient); err != nil {
return errors.Wrapf(err, "failed to retrieve the Service for Kubernetes API Server of the cluster %s/%s", cluster.Namespace, cluster.Name)
}

// Ensure an ownerReference to the clusterResourceSet is on the resource.
if err := r.ensureResourceOwnerRef(ctx, clusterResourceSet, unstructuredObj); err != nil {
log.Error(err, "Failed to add ClusterResourceSet as resource owner reference",
"Resource type", unstructuredObj.GetKind(), "Resource name", unstructuredObj.GetName())
errList = append(errList, err)
// Iterate all resources and apply them to the cluster and update the resource status in the ClusterResourceSetBinding object.
for i, resource := range clusterResourceSet.Spec.Resources {
unstructuredObj := objList[i]
if unstructuredObj == nil {
// Continue without adding the error to the aggregate if we can't find the resource.
continue
}

resourceScope, err := reconcileScopeForResource(clusterResourceSet, resource, resourceSetBinding, unstructuredObj)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,13 @@ metadata:
}
g.Eventually(func() bool {
m := &corev1.ConfigMap{}
err := env.Get(ctx, cmKey, m)
return err == nil
if err := env.Get(ctx, cmKey, m); err != nil {
return false
}
if len(m.OwnerReferences) != 1 || m.OwnerReferences[0].Name != crsInstance.Name {
return false
}
return true
}, timeout).Should(BeTrue())

// When the ConfigMap resource is created, CRS should get reconciled immediately.
Expand Down Expand Up @@ -445,8 +450,13 @@ metadata:
}
g.Eventually(func() bool {
m := &corev1.Secret{}
err := env.Get(ctx, cmKey, m)
return err == nil
if err := env.Get(ctx, cmKey, m); err != nil {
return false
}
if len(m.OwnerReferences) != 1 || m.OwnerReferences[0].Name != crsInstance.Name {
return false
}
return true
}, timeout).Should(BeTrue())

// When the Secret resource is created, CRS should get reconciled immediately.
Expand Down Expand Up @@ -911,7 +921,7 @@ metadata:
g.Expect(env.Delete(ctx, missingNs)).To(Succeed())
})

t.Run("Should only create ClusterResourceSetBinding after the remote cluster's Kubernetes API Server Service has been created", func(t *testing.T) {
t.Run("Should only apply resources after the remote cluster's Kubernetes API Server Service has been created", func(t *testing.T) {
g := NewWithT(t)
ns := setup(t, g)
defer teardown(t, g, ns)
Expand Down Expand Up @@ -962,6 +972,7 @@ metadata:
ClusterSelector: metav1.LabelSelector{
MatchLabels: labels,
},
Resources: []addonsv1.ResourceRef{{Name: secretName, Kind: "Secret"}},
},
}
// Create the ClusterResourceSet.
Expand All @@ -970,13 +981,22 @@ metadata:
testCluster.SetLabels(labels)
g.Expect(env.Update(ctx, testCluster)).To(Succeed())

// ClusterResourceSetBinding for the Cluster is not created because the Kubernetes API Server Service doesn't exist.
// Resources are not applied because the Kubernetes API Server Service doesn't exist.
clusterResourceSetBindingKey := client.ObjectKey{Namespace: testCluster.Namespace, Name: testCluster.Name}
g.Consistently(func() bool {
binding := &addonsv1.ClusterResourceSetBinding{}

err := env.Get(ctx, clusterResourceSetBindingKey, binding)
return apierrors.IsNotFound(err)
if err := env.Get(ctx, clusterResourceSetBindingKey, binding); err != nil {
// either the binding is not there
return true
}
// or the binding is there but resources are not applied
for _, b := range binding.Spec.Bindings {
if len(b.Resources) > 0 {
return false
}
}
return true
}, timeout).Should(BeTrue())

t.Log("Create Kubernetes API Server Service")
Expand All @@ -990,10 +1010,19 @@ metadata:
g.Expect(env.Patch(ctx, clusterResourceSetInstance, client.MergeFrom(clusterResourceSetInstance.DeepCopy()))).To(Succeed())

// Wait until ClusterResourceSetBinding is created for the Cluster
g.Eventually(func(g Gomega) {
g.Eventually(func() bool {
// the binding must exists and track resource being applied
binding := &addonsv1.ClusterResourceSetBinding{}
g.Expect(env.Get(ctx, clusterResourceSetBindingKey, binding)).Should(Succeed())
}, timeout).Should(Succeed())
if err := env.Get(ctx, clusterResourceSetBindingKey, binding); err != nil {
return false
}
for _, b := range binding.Spec.Bindings {
if len(b.Resources) == 0 {
return false
}
}
return len(binding.Spec.Bindings) != 0
}, timeout).Should(BeTrue())
})

t.Run("Should handle applying multiple ClusterResourceSets concurrently to the same cluster", func(t *testing.T) {
Expand Down

0 comments on commit 18a5a04

Please sign in to comment.