Skip to content

Commit

Permalink
the rate limiter was integrated into resourses of kafkamanagement group
Browse files Browse the repository at this point in the history
  • Loading branch information
Bohdan Siryk authored and Bohdan Siryk committed Oct 19, 2023
1 parent 4d5d744 commit 0861ef7
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 96 deletions.
2 changes: 1 addition & 1 deletion controllers/clusters/zookeeper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (r *ZookeeperReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if k8serrors.IsNotFound(err) {
l.Info("Zookeeper resource is not found",
"request", req)
return models.ExitReconcile, nil
return ctrl.Result{}, nil
}

l.Error(err, "unable to fetch Zookeeper",
Expand Down
49 changes: 28 additions & 21 deletions controllers/kafkamanagement/kafkaacl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/instaclustr/operator/apis/kafkamanagement/v1beta1"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/ratelimiter"
)

// KafkaACLReconciler reconciles a KafkaACL object
Expand Down Expand Up @@ -64,37 +66,37 @@ func (r *KafkaACLReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if err != nil {
if k8serrors.IsNotFound(err) {
l.Error(err, "Kafka ACL resource is not found", "request", req)
return models.ExitReconcile, nil
return ctrl.Result{}, nil
}

l.Error(err, "Unable to fetch kafka ACL", "request", req)
return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}

switch kafkaACL.Annotations[models.ResourceStateAnnotation] {
case models.CreatingEvent:
return r.handleCreateKafkaACL(ctx, &kafkaACL, l), nil
return r.handleCreateKafkaACL(ctx, &kafkaACL, l)

case models.UpdatingEvent:
return r.handleUpdateKafkaACL(ctx, &kafkaACL, l), nil
return r.handleUpdateKafkaACL(ctx, &kafkaACL, l)

case models.DeletingEvent:
return r.handleDeleteKafkaACL(ctx, &kafkaACL, l), nil
return r.handleDeleteKafkaACL(ctx, &kafkaACL, l)
default:
l.Info("Event isn't handled",
"cluster ID", kafkaACL.Spec.ClusterID,
"user query", kafkaACL.Spec.UserQuery,
"request", req,
"event", kafkaACL.Annotations[models.ResourceStateAnnotation])
return models.ExitReconcile, nil
return ctrl.Result{}, nil
}
}

func (r *KafkaACLReconciler) handleCreateKafkaACL(
ctx context.Context,
acl *v1beta1.KafkaACL,
l logr.Logger,
) reconcile.Result {
) (ctrl.Result, error) {
if acl.Status.ID == "" {
l.Info("Creating kafka ACL",
"cluster ID", acl.Spec.ClusterID,
Expand All @@ -111,7 +113,7 @@ func (r *KafkaACLReconciler) handleCreateKafkaACL(
"Resource creation on the Instaclustr is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -133,7 +135,7 @@ func (r *KafkaACLReconciler) handleCreateKafkaACL(
"Cluster resource status patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

controllerutil.AddFinalizer(acl, models.DeletionFinalizer)
Expand All @@ -149,7 +151,7 @@ func (r *KafkaACLReconciler) handleCreateKafkaACL(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

l.Info(
Expand All @@ -159,14 +161,14 @@ func (r *KafkaACLReconciler) handleCreateKafkaACL(
)
}

return models.ExitReconcile
return ctrl.Result{}, nil
}

func (r *KafkaACLReconciler) handleUpdateKafkaACL(
ctx context.Context,
acl *v1beta1.KafkaACL,
l logr.Logger,
) reconcile.Result {
) (ctrl.Result, error) {
err := r.API.UpdateKafkaACL(acl.Status.ID, instaclustr.KafkaACLEndpoint, &acl.Spec)
if err != nil {
l.Error(err, "Cannot update kafka ACL",
Expand All @@ -178,7 +180,7 @@ func (r *KafkaACLReconciler) handleUpdateKafkaACL(
"Resource update on the Instaclustr API is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

patch := acl.NewPatch()
Expand All @@ -194,21 +196,21 @@ func (r *KafkaACLReconciler) handleUpdateKafkaACL(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

l.Info("Kafka ACL has been updated",
"cluster ID", acl.Spec.ClusterID,
"user query", acl.Spec.UserQuery,
)
return models.ExitReconcile
return ctrl.Result{}, nil
}

func (r *KafkaACLReconciler) handleDeleteKafkaACL(
ctx context.Context,
acl *v1beta1.KafkaACL,
l logr.Logger,
) reconcile.Result {
) (reconcile.Result, error) {
patch := acl.NewPatch()
err := r.Patch(ctx, acl, patch)
if err != nil {
Expand All @@ -221,7 +223,7 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

status, err := r.API.GetKafkaACLStatus(acl.Status.ID, instaclustr.KafkaACLEndpoint)
Expand All @@ -236,7 +238,7 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL(
"Resource fetch from the Instaclustr API is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

if status != nil {
Expand All @@ -251,7 +253,7 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL(
"Resource deletion is failed on the Instaclustr. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}
r.EventRecorder.Eventf(
acl, models.Normal, models.DeletionStarted,
Expand All @@ -272,7 +274,7 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

l.Info("Kafka ACL has been deleted",
Expand All @@ -285,12 +287,17 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL(
"Resource is deleted",
)

return models.ExitReconcile
return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *KafkaACLReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{
RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(
ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay,
),
}).
For(&v1beta1.KafkaACL{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
if event.Object.GetDeletionTimestamp() != nil {
Expand Down
44 changes: 25 additions & 19 deletions controllers/kafkamanagement/kafkauser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -46,6 +47,7 @@ import (
"github.com/instaclustr/operator/apis/kafkamanagement/v1beta1"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/ratelimiter"
)

const (
Expand Down Expand Up @@ -78,7 +80,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if err != nil {
if k8serrors.IsNotFound(err) {
l.Info("Kafka user resource is not found", "request", req)
return models.ExitReconcile, nil
return ctrl.Result{}, nil
}
l.Error(err, "Unable to fetch Kafka user", "request", req)
r.EventRecorder.Eventf(
Expand All @@ -87,7 +89,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
err,
)

return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}

secret := &v1.Secret{}
Expand All @@ -101,7 +103,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
r.EventRecorder.Eventf(user, models.Warning, models.FetchFailed,
"Fetch user credentials secret is failed. Reason: %v", err)

return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}

username, password, err := r.getKafkaUserCredsFromSecret(user.Spec)
Expand All @@ -110,7 +112,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
r.EventRecorder.Eventf(user, models.Warning, models.FetchFailed,
"Fetch user credentials from secret is failed. Reason: %v", err)

return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}

if controllerutil.AddFinalizer(secret, user.GetDeletionFinalizer()) {
Expand All @@ -122,7 +124,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
r.EventRecorder.Eventf(user, models.Warning, models.UpdatedEvent,
"Cannot assign Kafka user to a k8s secret. Reason: %v", err)

return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}
}

Expand All @@ -135,7 +137,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
user, models.Warning, models.PatchFailed,
"Patching Kafka user with finalizer has been failed. Reason: %v", err)

return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}
}

Expand All @@ -158,7 +160,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
err,
)

return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -185,7 +187,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
err,
)

return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}

l.Info(
Expand All @@ -210,7 +212,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
err,
)

return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -232,7 +234,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
err,
)

return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}

l.Info("Kafka user has been deleted",
Expand Down Expand Up @@ -260,7 +262,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
err,
)

return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}

l.Info("Kafka user has been detached",
Expand All @@ -272,7 +274,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
)

continue

}

iKafkaUser := user.Spec.ToInstAPI(clusterID, username, password)
Expand All @@ -288,7 +289,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
err,
)

return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}

user.Annotations[models.ResourceStateAnnotation] = models.UpdatedEvent
Expand All @@ -304,7 +305,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
err,
)

return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}

user.Status.ClustersEvents[clusterID] = models.UpdatedEvent
Expand All @@ -320,7 +321,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
err,
)

return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}

l.Info("Kafka user resource has been updated",
Expand All @@ -337,7 +338,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
"username", username, "cluster ID", clusterID)
r.EventRecorder.Event(user, models.Warning, models.DeletingEvent, instaclustr.MsgDeleteUser)

return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}
}

Expand All @@ -350,7 +351,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
"Deleting finalizer from the Kafka user resource has been failed. Reason: %v", err,
)

return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}

controllerutil.RemoveFinalizer(secret, user.GetDeletionFinalizer())
Expand All @@ -361,13 +362,13 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
r.EventRecorder.Eventf(user, models.Warning, models.PatchFailed,
"Resource patch is failed. Reason: %v", err)

return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}

l.Info("Kafka user resource has been deleted", "username", username)
}

return models.ExitReconcile, nil
return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
Expand All @@ -391,6 +392,11 @@ func (r *KafkaUserReconciler) SetupWithManager(mgr ctrl.Manager) error {
}

return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{
RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(
ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay,
),
}).
For(&v1beta1.KafkaUser{}, builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(event event.UpdateEvent) bool {
newObj := event.ObjectNew.(*v1beta1.KafkaUser)
Expand Down
Loading

0 comments on commit 0861ef7

Please sign in to comment.