Skip to content

Commit

Permalink
the rate limiter was integrated into resourses of kafkamanagement group
Browse files Browse the repository at this point in the history
  • Loading branch information
Bohdan Siryk authored and ribaraka committed Oct 20, 2023
1 parent f98094f commit 26eee3c
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 103 deletions.
2 changes: 1 addition & 1 deletion controllers/clusters/zookeeper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (r *ZookeeperReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if k8serrors.IsNotFound(err) {
l.Info("Zookeeper resource is not found",
"request", req)
return models.ExitReconcile, nil
return ctrl.Result{}, nil
}

l.Error(err, "unable to fetch Zookeeper",
Expand Down
50 changes: 28 additions & 22 deletions controllers/kafkamanagement/kafkaacl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/instaclustr/operator/apis/kafkamanagement/v1beta1"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/ratelimiter"
)

// KafkaACLReconciler reconciles a KafkaACL object
Expand Down Expand Up @@ -64,37 +65,37 @@ func (r *KafkaACLReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if err != nil {
if k8serrors.IsNotFound(err) {
l.Error(err, "Kafka ACL resource is not found", "request", req)
return models.ExitReconcile, nil
return ctrl.Result{}, nil
}

l.Error(err, "Unable to fetch kafka ACL", "request", req)
return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}

switch kafkaACL.Annotations[models.ResourceStateAnnotation] {
case models.CreatingEvent:
return r.handleCreateKafkaACL(ctx, &kafkaACL, l), nil
return r.handleCreateKafkaACL(ctx, &kafkaACL, l)

case models.UpdatingEvent:
return r.handleUpdateKafkaACL(ctx, &kafkaACL, l), nil
return r.handleUpdateKafkaACL(ctx, &kafkaACL, l)

case models.DeletingEvent:
return r.handleDeleteKafkaACL(ctx, &kafkaACL, l), nil
return r.handleDeleteKafkaACL(ctx, &kafkaACL, l)
default:
l.Info("Event isn't handled",
"cluster ID", kafkaACL.Spec.ClusterID,
"user query", kafkaACL.Spec.UserQuery,
"request", req,
"event", kafkaACL.Annotations[models.ResourceStateAnnotation])
return models.ExitReconcile, nil
return ctrl.Result{}, nil
}
}

func (r *KafkaACLReconciler) handleCreateKafkaACL(
ctx context.Context,
acl *v1beta1.KafkaACL,
l logr.Logger,
) reconcile.Result {
) (ctrl.Result, error) {
if acl.Status.ID == "" {
l.Info("Creating kafka ACL",
"cluster ID", acl.Spec.ClusterID,
Expand All @@ -111,7 +112,7 @@ func (r *KafkaACLReconciler) handleCreateKafkaACL(
"Resource creation on the Instaclustr is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -133,7 +134,7 @@ func (r *KafkaACLReconciler) handleCreateKafkaACL(
"Cluster resource status patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

controllerutil.AddFinalizer(acl, models.DeletionFinalizer)
Expand All @@ -149,7 +150,7 @@ func (r *KafkaACLReconciler) handleCreateKafkaACL(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

l.Info(
Expand All @@ -159,14 +160,14 @@ func (r *KafkaACLReconciler) handleCreateKafkaACL(
)
}

return models.ExitReconcile
return ctrl.Result{}, nil
}

func (r *KafkaACLReconciler) handleUpdateKafkaACL(
ctx context.Context,
acl *v1beta1.KafkaACL,
l logr.Logger,
) reconcile.Result {
) (ctrl.Result, error) {
err := r.API.UpdateKafkaACL(acl.Status.ID, instaclustr.KafkaACLEndpoint, &acl.Spec)
if err != nil {
l.Error(err, "Cannot update kafka ACL",
Expand All @@ -178,7 +179,7 @@ func (r *KafkaACLReconciler) handleUpdateKafkaACL(
"Resource update on the Instaclustr API is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

patch := acl.NewPatch()
Expand All @@ -194,21 +195,21 @@ func (r *KafkaACLReconciler) handleUpdateKafkaACL(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

l.Info("Kafka ACL has been updated",
"cluster ID", acl.Spec.ClusterID,
"user query", acl.Spec.UserQuery,
)
return models.ExitReconcile
return ctrl.Result{}, nil
}

func (r *KafkaACLReconciler) handleDeleteKafkaACL(
ctx context.Context,
acl *v1beta1.KafkaACL,
l logr.Logger,
) reconcile.Result {
) (ctrl.Result, error) {
patch := acl.NewPatch()
err := r.Patch(ctx, acl, patch)
if err != nil {
Expand All @@ -221,7 +222,7 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

status, err := r.API.GetKafkaACLStatus(acl.Status.ID, instaclustr.KafkaACLEndpoint)
Expand All @@ -236,7 +237,7 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL(
"Resource fetch from the Instaclustr API is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

if status != nil {
Expand All @@ -251,7 +252,7 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL(
"Resource deletion is failed on the Instaclustr. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}
r.EventRecorder.Eventf(
acl, models.Normal, models.DeletionStarted,
Expand All @@ -272,7 +273,7 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

l.Info("Kafka ACL has been deleted",
Expand All @@ -285,12 +286,17 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL(
"Resource is deleted",
)

return models.ExitReconcile
return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *KafkaACLReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{
RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(
ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay,
),
}).
For(&v1beta1.KafkaACL{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
if event.Object.GetDeletionTimestamp() != nil {
Expand Down
Loading

0 comments on commit 26eee3c

Please sign in to comment.