Skip to content

Commit

Permalink
custom rate limiter for all cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
ribaraka committed Oct 19, 2023
1 parent e9cccef commit c5a768f
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 227 deletions.
2 changes: 1 addition & 1 deletion controllers/clusters/cassandra_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (r *CassandraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

l.Error(err, "Unable to fetch Cassandra cluster",
"request", req)
return models.ReconcileRequeue, err
return reconcile.Result{}, err
}

switch cassandra.Annotations[models.ResourceStateAnnotation] {
Expand Down
79 changes: 40 additions & 39 deletions controllers/clusters/kafka_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/instaclustr/operator/pkg/exposeservice"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/ratelimiter"
"github.com/instaclustr/operator/pkg/scheduler"
)

Expand Down Expand Up @@ -73,19 +75,16 @@ func (r *KafkaReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
}

l.Error(err, "Unable to fetch Kafka", "request", req)
return models.ReconcileRequeue, err
return reconcile.Result{}, err
}

switch kafka.Annotations[models.ResourceStateAnnotation] {
case models.CreatingEvent:
return r.handleCreateCluster(ctx, &kafka, l), nil

return r.handleCreateCluster(ctx, &kafka, l)
case models.UpdatingEvent:
return r.handleUpdateCluster(ctx, &kafka, l), nil

return r.handleUpdateCluster(ctx, &kafka, l)
case models.DeletingEvent:
return r.handleDeleteCluster(ctx, &kafka, l), nil

return r.handleDeleteCluster(ctx, &kafka, l)
case models.GenericEvent:
l.Info("Event isn't handled", "cluster name", kafka.Spec.Name, "request", req,
"event", kafka.Annotations[models.ResourceStateAnnotation])
Expand All @@ -95,7 +94,7 @@ func (r *KafkaReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
return models.ExitReconcile, nil
}

func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta1.Kafka, l logr.Logger) reconcile.Result {
func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta1.Kafka, l logr.Logger) (reconcile.Result, error) {
l = l.WithName("Kafka creation Event")

var err error
Expand All @@ -115,7 +114,7 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta
"Cluster creation on the Instaclustr is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -134,7 +133,7 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta
"Cluster resource status patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

kafka.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent
Expand All @@ -149,7 +148,7 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info("Cluster has been created",
Expand All @@ -167,7 +166,7 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta
"Cluster status check job creation is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -182,7 +181,7 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta
r.EventRecorder.Eventf(kafka, models.Warning, models.CreationFailed,
"User creation job is failed. Reason: %v", err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Event(kafka, models.Normal, models.Created,
Expand All @@ -191,26 +190,26 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta
}
}

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *KafkaReconciler) handleUpdateCluster(
ctx context.Context,
k *v1beta1.Kafka,
l logr.Logger,
) reconcile.Result {
) (reconcile.Result, error) {
l = l.WithName("Kafka update Event")

iData, err := r.API.GetKafka(k.Status.ID)
if err != nil {
l.Error(err, "Cannot get cluster from the Instaclustr", "cluster ID", k.Status.ID)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

iKafka, err := k.FromInstAPI(iData)
if err != nil {
l.Error(err, "Cannot convert cluster from the Instaclustr API", "cluster ID", k.Status.ID)
return models.ExitReconcile
return reconcile.Result{}, err
}

if iKafka.Status.ClusterStatus.State != StatusRUNNING {
Expand All @@ -230,9 +229,9 @@ func (r *KafkaReconciler) handleUpdateCluster(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}
return models.ReconcileRequeue
return reconcile.Result{}, err
}

if k.Annotations[models.ExternalChangesAnnotation] == models.True {
Expand All @@ -251,12 +250,12 @@ func (r *KafkaReconciler) handleUpdateCluster(
r.EventRecorder.Eventf(k, models.Warning, models.UpdateFailed,
"Cannot update cluster settings. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}
}

if k.Spec.IsEqual(iKafka.Spec) {
return models.ExitReconcile
return models.ExitReconcile, nil
}

l.Info("Update request to Instaclustr API has been sent",
Expand Down Expand Up @@ -284,10 +283,10 @@ func (r *KafkaReconciler) handleUpdateCluster(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

return models.ReconcileRequeue
return reconcile.Result{}, err
}

patch := k.NewPatch()
Expand All @@ -301,7 +300,7 @@ func (r *KafkaReconciler) handleUpdateCluster(
r.EventRecorder.Eventf(k, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info(
Expand All @@ -311,7 +310,7 @@ func (r *KafkaReconciler) handleUpdateCluster(
"data centres", k.Spec.DataCentres,
)

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *KafkaReconciler) handleCreateUser(
Expand Down Expand Up @@ -531,7 +530,7 @@ func (r *KafkaReconciler) handleUserEvent(
}
}

func (r *KafkaReconciler) handleExternalChanges(k, ik *v1beta1.Kafka, l logr.Logger) reconcile.Result {
func (r *KafkaReconciler) handleExternalChanges(k, ik *v1beta1.Kafka, l logr.Logger) (reconcile.Result, error) {
if !k.Spec.IsEqual(ik.Spec) {
l.Info("The k8s specification is different from Instaclustr Console. Update operations are blocked.",
"specification of k8s resource", k.Spec,
Expand All @@ -541,10 +540,10 @@ func (r *KafkaReconciler) handleExternalChanges(k, ik *v1beta1.Kafka, l logr.Log
if err != nil {
l.Error(err, "Cannot create specification difference message",
"instaclustr data", ik.Spec, "k8s resource spec", k.Spec)
return models.ExitReconcile
return models.ExitReconcile, nil
}
r.EventRecorder.Eventf(k, models.Warning, models.ExternalChanges, msgDiffSpecs)
return models.ExitReconcile
return models.ExitReconcile, nil
}

patch := k.NewPatch()
Expand All @@ -559,16 +558,16 @@ func (r *KafkaReconciler) handleExternalChanges(k, ik *v1beta1.Kafka, l logr.Log
r.EventRecorder.Eventf(k, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info("External changes have been reconciled", "kafka ID", k.Status.ID)
r.EventRecorder.Event(k, models.Normal, models.ExternalChanges, "External changes have been reconciled")

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta1.Kafka, l logr.Logger) reconcile.Result {
func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta1.Kafka, l logr.Logger) (reconcile.Result, error) {
l = l.WithName("Kafka deletion Event")

_, err := r.API.GetKafka(kafka.Status.ID)
Expand All @@ -581,7 +580,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta
"Cluster resource fetch from the Instaclustr API is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

patch := kafka.NewPatch()
Expand All @@ -600,7 +599,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta
"Cluster deletion is failed on the Instaclustr. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -621,23 +620,23 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info(msgDeleteClusterWithTwoFactorDelete, "cluster ID", kafka.Status.ID)

r.EventRecorder.Event(kafka, models.Normal, models.DeletionStarted,
"Two-Factor Delete is enabled, please confirm cluster deletion via email or phone.")

return models.ExitReconcile
return models.ExitReconcile, nil
}
}

for _, ref := range kafka.Spec.UserRefs {
err = r.detachUser(ctx, kafka, l, ref)
if err != nil {

return models.ReconcileRequeue
return reconcile.Result{}, err
}
}

Expand All @@ -654,7 +653,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

err = exposeservice.Delete(r.Client, kafka.Name, kafka.Namespace)
Expand All @@ -664,7 +663,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta
"cluster name", kafka.Spec.Name,
)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info("Cluster was deleted",
Expand All @@ -676,7 +675,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta
"Cluster resource is deleted",
)

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *KafkaReconciler) startClusterStatusJob(kafka *v1beta1.Kafka) error {
Expand Down Expand Up @@ -915,6 +914,8 @@ func (r *KafkaReconciler) handleExternalDelete(ctx context.Context, kafka *v1bet
// SetupWithManager sets up the controller with the Manager.
func (r *KafkaReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{
RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}).
For(&v1beta1.Kafka{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
annots := event.Object.GetAnnotations()
Expand Down
Loading

0 comments on commit c5a768f

Please sign in to comment.