Skip to content

Commit

Permalink
issue-559, cadence controller integration of the rate limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
Bohdan Siryk authored and Bohdan Siryk committed Oct 18, 2023
1 parent 4d5d744 commit e04924f
Show file tree
Hide file tree
Showing 11 changed files with 557 additions and 58 deletions.
89 changes: 48 additions & 41 deletions controllers/clusters/cadence_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -40,6 +41,7 @@ import (
"github.com/instaclustr/operator/pkg/exposeservice"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/ratelimiter"
"github.com/instaclustr/operator/pkg/scheduler"
)

Expand Down Expand Up @@ -73,44 +75,44 @@ func (r *CadenceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
logger.Info("Cadence resource is not found",
"resource name", req.NamespacedName,
)
return models.ExitReconcile, nil
return reconcile.Result{}, nil
}

logger.Error(err, "Unable to fetch Cadence resource",
"resource name", req.NamespacedName,
)
return models.ReconcileRequeue, nil
return reconcile.Result{}, err
}

switch cadenceCluster.Annotations[models.ResourceStateAnnotation] {
case models.CreatingEvent:
return r.HandleCreateCluster(ctx, cadenceCluster, logger), nil
return r.HandleCreateCluster(ctx, cadenceCluster, logger)
case models.UpdatingEvent:
return r.HandleUpdateCluster(ctx, cadenceCluster, logger), nil
return r.HandleUpdateCluster(ctx, cadenceCluster, logger)
case models.DeletingEvent:
return r.HandleDeleteCluster(ctx, cadenceCluster, logger), nil
return r.HandleDeleteCluster(ctx, cadenceCluster, logger)
case models.GenericEvent:
logger.Info("Generic event isn't handled",
"request", req,
"event", cadenceCluster.Annotations[models.ResourceStateAnnotation],
)

return models.ExitReconcile, nil
return reconcile.Result{}, nil
default:
logger.Info("Unknown event isn't handled",
"request", req,
"event", cadenceCluster.Annotations[models.ResourceStateAnnotation],
)

return models.ExitReconcile, nil
return reconcile.Result{}, nil
}
}

func (r *CadenceReconciler) HandleCreateCluster(
ctx context.Context,
cadence *v1beta1.Cadence,
logger logr.Logger,
) reconcile.Result {
) (ctrl.Result, error) {
if cadence.Status.ID == "" {
patch := cadence.NewPatch()

Expand All @@ -124,7 +126,7 @@ func (r *CadenceReconciler) HandleCreateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.CreationFailed,
"Cannot prepare packaged solution for Cadence cluster. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

if requeueNeeded {
Expand All @@ -134,7 +136,7 @@ func (r *CadenceReconciler) HandleCreateCluster(
r.EventRecorder.Event(cadence, models.Normal, "Waiting",
"Waiting for bundled clusters to be created")

return models.ReconcileRequeue
return models.ReconcileRequeue, nil
}
}

Expand All @@ -152,7 +154,7 @@ func (r *CadenceReconciler) HandleCreateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.ConvertionFailed,
"Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

id, err := r.API.CreateCluster(instaclustr.CadenceEndpoint, cadenceAPISpec)
Expand All @@ -164,7 +166,7 @@ func (r *CadenceReconciler) HandleCreateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.CreationFailed,
"Cluster creation on the Instaclustr is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

cadence.Status.ID = id
Expand All @@ -178,7 +180,7 @@ func (r *CadenceReconciler) HandleCreateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.PatchFailed,
"Cluster resource status patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

if cadence.Spec.Description != "" {
Expand Down Expand Up @@ -206,7 +208,7 @@ func (r *CadenceReconciler) HandleCreateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.PatchFailed,
"Cluster resource status patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

logger.Info(
Expand All @@ -232,21 +234,21 @@ func (r *CadenceReconciler) HandleCreateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.CreationFailed,
"Cluster status check job is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Event(cadence, models.Normal, models.Created,
"Cluster status check job is started")
}

return models.ExitReconcile
return reconcile.Result{}, nil
}

func (r *CadenceReconciler) HandleUpdateCluster(
ctx context.Context,
cadence *v1beta1.Cadence,
logger logr.Logger,
) reconcile.Result {
) (ctrl.Result, error) {
iData, err := r.API.GetCadence(cadence.Status.ID)
if err != nil {
logger.Error(
Expand All @@ -258,7 +260,7 @@ func (r *CadenceReconciler) HandleUpdateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.FetchFailed,
"Cluster fetch from the Instaclustr API is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

iCadence, err := cadence.FromInstAPI(iData)
Expand All @@ -272,7 +274,7 @@ func (r *CadenceReconciler) HandleUpdateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.ConvertionFailed,
"Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

if iCadence.Status.CurrentClusterOperationStatus != models.NoOperation {
Expand All @@ -294,10 +296,10 @@ func (r *CadenceReconciler) HandleUpdateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

return models.ReconcileRequeue
return models.ReconcileRequeue, nil
}

if cadence.Annotations[models.ExternalChangesAnnotation] == models.True {
Expand All @@ -316,7 +318,7 @@ func (r *CadenceReconciler) HandleUpdateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.UpdateFailed,
"Cannot update cluster settings. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}
}

Expand All @@ -334,7 +336,7 @@ func (r *CadenceReconciler) HandleUpdateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.UpdateFailed,
"Cluster update on the Instaclustr API is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

patch := cadence.NewPatch()
Expand All @@ -349,7 +351,7 @@ func (r *CadenceReconciler) HandleUpdateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

logger.Info(
Expand All @@ -359,10 +361,10 @@ func (r *CadenceReconciler) HandleUpdateCluster(
"data centres", cadence.Spec.DataCentres,
)

return models.ExitReconcile
return reconcile.Result{}, nil
}

func (r *CadenceReconciler) handleExternalChanges(cadence, iCadence *v1beta1.Cadence, l logr.Logger) reconcile.Result {
func (r *CadenceReconciler) handleExternalChanges(cadence, iCadence *v1beta1.Cadence, l logr.Logger) (reconcile.Result, error) {
if !cadence.Spec.AreDCsEqual(iCadence.Spec.DataCentres) {
l.Info(msgExternalChanges,
"instaclustr data", iCadence.Spec.DataCentres,
Expand All @@ -372,11 +374,11 @@ func (r *CadenceReconciler) handleExternalChanges(cadence, iCadence *v1beta1.Cad
if err != nil {
l.Error(err, "Cannot create specification difference message",
"instaclustr data", iCadence.Spec, "k8s resource spec", cadence.Spec)
return models.ExitReconcile
return reconcile.Result{}, err
}
r.EventRecorder.Eventf(cadence, models.Warning, models.ExternalChanges, msgDiffSpecs)

return models.ExitReconcile
return reconcile.Result{}, nil
}

patch := cadence.NewPatch()
Expand All @@ -391,20 +393,20 @@ func (r *CadenceReconciler) handleExternalChanges(cadence, iCadence *v1beta1.Cad
r.EventRecorder.Eventf(cadence, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info("External changes have been reconciled", "resource ID", cadence.Status.ID)
r.EventRecorder.Event(cadence, models.Normal, models.ExternalChanges, "External changes have been reconciled")

return models.ExitReconcile
return reconcile.Result{}, nil
}

func (r *CadenceReconciler) HandleDeleteCluster(
ctx context.Context,
cadence *v1beta1.Cadence,
logger logr.Logger,
) reconcile.Result {
) (reconcile.Result, error) {
_, err := r.API.GetCadence(cadence.Status.ID)
if err != nil && !errors.Is(err, instaclustr.NotFound) {
logger.Error(
Expand All @@ -416,7 +418,7 @@ func (r *CadenceReconciler) HandleDeleteCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.FetchFailed,
"Cluster resource fetch from the Instaclustr API is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

if !errors.Is(err, instaclustr.NotFound) {
Expand All @@ -434,7 +436,7 @@ func (r *CadenceReconciler) HandleDeleteCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.DeletionFailed,
"Cluster deletion is failed on the Instaclustr. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Event(cadence, models.Normal, models.DeletionStarted,
Expand All @@ -454,18 +456,16 @@ func (r *CadenceReconciler) HandleDeleteCluster(
"Cluster resource patch is failed. Reason: %v",
err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

logger.Info(msgDeleteClusterWithTwoFactorDelete, "cluster ID", cadence.Status.ID)

r.EventRecorder.Event(cadence, models.Normal, models.DeletionStarted,
"Two-Factor Delete is enabled, please confirm cluster deletion via email or phone.")

return models.ExitReconcile
return reconcile.Result{}, nil
}

return models.ReconcileRequeue
}

logger.Info("Cadence cluster is being deleted",
Expand All @@ -484,7 +484,7 @@ func (r *CadenceReconciler) HandleDeleteCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.DeletionFailed,
"Cannot delete Cadence packaged resources. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}
}

Expand All @@ -499,7 +499,7 @@ func (r *CadenceReconciler) HandleDeleteCluster(
"cluster name", cadence.Spec.Name,
"patch", patch,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

err = exposeservice.Delete(r.Client, cadence.Name, cadence.Namespace)
Expand All @@ -509,7 +509,7 @@ func (r *CadenceReconciler) HandleDeleteCluster(
"cluster name", cadence.Spec.Name,
)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

logger.Info("Cadence cluster was deleted",
Expand All @@ -519,7 +519,7 @@ func (r *CadenceReconciler) HandleDeleteCluster(

r.EventRecorder.Event(cadence, models.Normal, models.Deleted, "Cluster resource is deleted")

return models.ExitReconcile
return reconcile.Result{}, nil
}

func (r *CadenceReconciler) preparePackagedSolution(
Expand Down Expand Up @@ -851,6 +851,7 @@ func (r *CadenceReconciler) newWatchStatusJob(cadence *v1beta1.Cadence) schedule
}

if iCadence.Status.CurrentClusterOperationStatus == models.NoOperation &&
cadence.Annotations[models.ResourceStateAnnotation] != models.UpdatingEvent &&
cadence.Annotations[models.UpdateQueuedAnnotation] != models.True &&
!cadence.Spec.AreDCsEqual(iCadence.Spec.DataCentres) {
l.Info(msgExternalChanges,
Expand Down Expand Up @@ -1161,6 +1162,12 @@ func areSecondaryCadenceTargetsEqual(k8sTargets, iTargets []*v1beta1.TargetCaden
// SetupWithManager sets up the controller with the Manager.
func (r *CadenceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{
RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(
ratelimiter.DefaultBaseDelay,
ratelimiter.DefaultMaxDelay,
),
}).
For(&v1beta1.Cadence{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
if deleting := confirmDeletion(event.Object); deleting {
Expand Down
Loading

0 comments on commit e04924f

Please sign in to comment.