diff --git a/controllers/clusters/cassandra_controller.go b/controllers/clusters/cassandra_controller.go index 04db4671b..34498e1cb 100644 --- a/controllers/clusters/cassandra_controller.go +++ b/controllers/clusters/cassandra_controller.go @@ -84,7 +84,7 @@ func (r *CassandraReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( l.Error(err, "Unable to fetch Cassandra cluster", "request", req) - return models.ReconcileRequeue, err + return reconcile.Result{}, err } switch cassandra.Annotations[models.ResourceStateAnnotation] { diff --git a/controllers/clusters/kafka_controller.go b/controllers/clusters/kafka_controller.go index f257df0ae..a24e576da 100644 --- a/controllers/clusters/kafka_controller.go +++ b/controllers/clusters/kafka_controller.go @@ -28,6 +28,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" @@ -39,6 +40,7 @@ import ( "github.com/instaclustr/operator/pkg/exposeservice" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" "github.com/instaclustr/operator/pkg/scheduler" ) @@ -73,19 +75,16 @@ func (r *KafkaReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl } l.Error(err, "Unable to fetch Kafka", "request", req) - return models.ReconcileRequeue, err + return reconcile.Result{}, err } switch kafka.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: - return r.handleCreateCluster(ctx, &kafka, l), nil - + return r.handleCreateCluster(ctx, &kafka, l) case models.UpdatingEvent: - return r.handleUpdateCluster(ctx, &kafka, l), nil - + return r.handleUpdateCluster(ctx, &kafka, l) case models.DeletingEvent: - return r.handleDeleteCluster(ctx, &kafka, l), nil - + return r.handleDeleteCluster(ctx, &kafka, l) case models.GenericEvent: l.Info("Event isn't handled", "cluster name", kafka.Spec.Name, "request", req, "event", kafka.Annotations[models.ResourceStateAnnotation]) @@ -95,7 +94,7 @@ func (r *KafkaReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return models.ExitReconcile, nil } -func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta1.Kafka, l logr.Logger) reconcile.Result { +func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta1.Kafka, l logr.Logger) (reconcile.Result, error) { l = l.WithName("Kafka creation Event") var err error @@ -115,7 +114,7 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta "Cluster creation on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Eventf( @@ -134,7 +133,7 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta "Cluster resource status patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } kafka.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent @@ -149,7 +148,7 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info("Cluster has been created", @@ -167,7 +166,7 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta "Cluster status check job creation is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Eventf( @@ -182,7 +181,7 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta r.EventRecorder.Eventf(kafka, models.Warning, models.CreationFailed, "User creation job is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Event(kafka, models.Normal, models.Created, @@ -191,26 +190,26 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta } } - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *KafkaReconciler) handleUpdateCluster( ctx context.Context, k *v1beta1.Kafka, l logr.Logger, -) reconcile.Result { +) (reconcile.Result, error) { l = l.WithName("Kafka update Event") iData, err := r.API.GetKafka(k.Status.ID) if err != nil { l.Error(err, "Cannot get cluster from the Instaclustr", "cluster ID", k.Status.ID) - return models.ReconcileRequeue + return reconcile.Result{}, err } iKafka, err := k.FromInstAPI(iData) if err != nil { l.Error(err, "Cannot convert cluster from the Instaclustr API", "cluster ID", k.Status.ID) - return models.ExitReconcile + return reconcile.Result{}, err } if iKafka.Status.ClusterStatus.State != StatusRUNNING { @@ -230,9 +229,9 @@ func (r *KafkaReconciler) handleUpdateCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } - return models.ReconcileRequeue + return reconcile.Result{}, err } if k.Annotations[models.ExternalChangesAnnotation] == models.True { @@ -251,12 +250,12 @@ func (r *KafkaReconciler) handleUpdateCluster( r.EventRecorder.Eventf(k, models.Warning, models.UpdateFailed, "Cannot update cluster settings. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } } if k.Spec.IsEqual(iKafka.Spec) { - return models.ExitReconcile + return models.ExitReconcile, nil } l.Info("Update request to Instaclustr API has been sent", @@ -284,10 +283,10 @@ func (r *KafkaReconciler) handleUpdateCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } - return models.ReconcileRequeue + return reconcile.Result{}, err } patch := k.NewPatch() @@ -301,7 +300,7 @@ func (r *KafkaReconciler) handleUpdateCluster( r.EventRecorder.Eventf(k, models.Warning, models.PatchFailed, "Cluster resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info( @@ -311,7 +310,7 @@ func (r *KafkaReconciler) handleUpdateCluster( "data centres", k.Spec.DataCentres, ) - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *KafkaReconciler) handleCreateUser( @@ -531,7 +530,7 @@ func (r *KafkaReconciler) handleUserEvent( } } -func (r *KafkaReconciler) handleExternalChanges(k, ik *v1beta1.Kafka, l logr.Logger) reconcile.Result { +func (r *KafkaReconciler) handleExternalChanges(k, ik *v1beta1.Kafka, l logr.Logger) (reconcile.Result, error) { if !k.Spec.IsEqual(ik.Spec) { l.Info("The k8s specification is different from Instaclustr Console. Update operations are blocked.", "specification of k8s resource", k.Spec, @@ -541,10 +540,10 @@ func (r *KafkaReconciler) handleExternalChanges(k, ik *v1beta1.Kafka, l logr.Log if err != nil { l.Error(err, "Cannot create specification difference message", "instaclustr data", ik.Spec, "k8s resource spec", k.Spec) - return models.ExitReconcile + return models.ExitReconcile, nil } r.EventRecorder.Eventf(k, models.Warning, models.ExternalChanges, msgDiffSpecs) - return models.ExitReconcile + return models.ExitReconcile, nil } patch := k.NewPatch() @@ -559,16 +558,16 @@ func (r *KafkaReconciler) handleExternalChanges(k, ik *v1beta1.Kafka, l logr.Log r.EventRecorder.Eventf(k, models.Warning, models.PatchFailed, "Cluster resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info("External changes have been reconciled", "kafka ID", k.Status.ID) r.EventRecorder.Event(k, models.Normal, models.ExternalChanges, "External changes have been reconciled") - return models.ExitReconcile + return models.ExitReconcile, nil } -func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta1.Kafka, l logr.Logger) reconcile.Result { +func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta1.Kafka, l logr.Logger) (reconcile.Result, error) { l = l.WithName("Kafka deletion Event") _, err := r.API.GetKafka(kafka.Status.ID) @@ -581,7 +580,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta "Cluster resource fetch from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } patch := kafka.NewPatch() @@ -600,7 +599,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta "Cluster deletion is failed on the Instaclustr. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Eventf( @@ -621,7 +620,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info(msgDeleteClusterWithTwoFactorDelete, "cluster ID", kafka.Status.ID) @@ -629,7 +628,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta r.EventRecorder.Event(kafka, models.Normal, models.DeletionStarted, "Two-Factor Delete is enabled, please confirm cluster deletion via email or phone.") - return models.ExitReconcile + return models.ExitReconcile, nil } } @@ -637,7 +636,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta err = r.detachUser(ctx, kafka, l, ref) if err != nil { - return models.ReconcileRequeue + return reconcile.Result{}, err } } @@ -654,7 +653,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } err = exposeservice.Delete(r.Client, kafka.Name, kafka.Namespace) @@ -664,7 +663,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta "cluster name", kafka.Spec.Name, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info("Cluster was deleted", @@ -676,7 +675,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta "Cluster resource is deleted", ) - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *KafkaReconciler) startClusterStatusJob(kafka *v1beta1.Kafka) error { @@ -915,6 +914,8 @@ func (r *KafkaReconciler) handleExternalDelete(ctx context.Context, kafka *v1bet // SetupWithManager sets up the controller with the Manager. func (r *KafkaReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&v1beta1.Kafka{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(event event.CreateEvent) bool { annots := event.Object.GetAnnotations() diff --git a/controllers/clusters/kafkaconnect_controller.go b/controllers/clusters/kafkaconnect_controller.go index a70a441bd..b2dffc139 100644 --- a/controllers/clusters/kafkaconnect_controller.go +++ b/controllers/clusters/kafkaconnect_controller.go @@ -27,6 +27,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" @@ -37,6 +38,7 @@ import ( "github.com/instaclustr/operator/pkg/exposeservice" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" "github.com/instaclustr/operator/pkg/scheduler" ) @@ -71,16 +73,16 @@ func (r *KafkaConnectReconciler) Reconcile(ctx context.Context, req ctrl.Request } l.Error(err, "Unable to fetch KafkaConnect", "request", req) - return models.ReconcileRequeue, err + return reconcile.Result{}, err } switch kafkaConnect.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: - return r.handleCreateCluster(ctx, kafkaConnect, l), nil + return r.handleCreateCluster(ctx, kafkaConnect, l) case models.UpdatingEvent: - return r.handleUpdateCluster(ctx, kafkaConnect, l), nil + return r.handleUpdateCluster(ctx, kafkaConnect, l) case models.DeletingEvent: - return r.handleDeleteCluster(ctx, kafkaConnect, l), nil + return r.handleDeleteCluster(ctx, kafkaConnect, l) default: l.Info("Event isn't handled", "cluster name", kafkaConnect.Spec.Name, "request", req, "event", kafkaConnect.Annotations[models.ResourceStateAnnotation]) @@ -88,7 +90,7 @@ func (r *KafkaConnectReconciler) Reconcile(ctx context.Context, req ctrl.Request } } -func (r *KafkaConnectReconciler) handleCreateCluster(ctx context.Context, kc *v1beta1.KafkaConnect, l logr.Logger) reconcile.Result { +func (r *KafkaConnectReconciler) handleCreateCluster(ctx context.Context, kc *v1beta1.KafkaConnect, l logr.Logger) (reconcile.Result, error) { l = l.WithName("Creation Event") if kc.Status.ID == "" { @@ -107,7 +109,7 @@ func (r *KafkaConnectReconciler) handleCreateCluster(ctx context.Context, kc *v1 "Cluster creation on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Eventf( @@ -123,7 +125,7 @@ func (r *KafkaConnectReconciler) handleCreateCluster(ctx context.Context, kc *v1 "Cluster resource status patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } kc.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent @@ -136,7 +138,7 @@ func (r *KafkaConnectReconciler) handleCreateCluster(ctx context.Context, kc *v1 "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } err = r.createDefaultSecret(ctx, kc, l) @@ -151,7 +153,7 @@ func (r *KafkaConnectReconciler) handleCreateCluster(ctx context.Context, kc *v1 err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info("Kafka Connect cluster has been created", @@ -168,7 +170,7 @@ func (r *KafkaConnectReconciler) handleCreateCluster(ctx context.Context, kc *v1 "Cluster status check job is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Eventf( @@ -177,10 +179,10 @@ func (r *KafkaConnectReconciler) handleCreateCluster(ctx context.Context, kc *v1 ) } - return models.ExitReconcile + return models.ExitReconcile, nil } -func (r *KafkaConnectReconciler) handleUpdateCluster(ctx context.Context, kc *v1beta1.KafkaConnect, l logr.Logger) reconcile.Result { +func (r *KafkaConnectReconciler) handleUpdateCluster(ctx context.Context, kc *v1beta1.KafkaConnect, l logr.Logger) (reconcile.Result, error) { l = l.WithName("Update Event") iData, err := r.API.GetKafkaConnect(kc.Status.ID) @@ -192,7 +194,7 @@ func (r *KafkaConnectReconciler) handleUpdateCluster(ctx context.Context, kc *v1 "Cluster fetch from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } iKC, err := kc.FromInst(iData) @@ -204,7 +206,7 @@ func (r *KafkaConnectReconciler) handleUpdateCluster(ctx context.Context, kc *v1 "Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } if kc.Annotations[models.ExternalChangesAnnotation] == models.True { @@ -223,7 +225,7 @@ func (r *KafkaConnectReconciler) handleUpdateCluster(ctx context.Context, kc *v1 r.EventRecorder.Eventf(kc, models.Warning, models.UpdateFailed, "Cannot update cluster settings. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } } @@ -257,9 +259,9 @@ func (r *KafkaConnectReconciler) handleUpdateCluster(ctx context.Context, kc *v1 "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } - return models.ReconcileRequeue + return reconcile.Result{}, err } } @@ -277,7 +279,7 @@ func (r *KafkaConnectReconciler) handleUpdateCluster(ctx context.Context, kc *v1 "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info( @@ -287,10 +289,10 @@ func (r *KafkaConnectReconciler) handleUpdateCluster(ctx context.Context, kc *v1 "data centres", kc.Spec.DataCentres, ) - return models.ExitReconcile + return models.ExitReconcile, nil } -func (r *KafkaConnectReconciler) handleExternalChanges(k, ik *v1beta1.KafkaConnect, l logr.Logger) reconcile.Result { +func (r *KafkaConnectReconciler) handleExternalChanges(k, ik *v1beta1.KafkaConnect, l logr.Logger) (reconcile.Result, error) { if !k.Spec.IsEqual(ik.Spec) { l.Info(msgSpecStillNoMatch, "specification of k8s resource", k.Spec, @@ -300,10 +302,10 @@ func (r *KafkaConnectReconciler) handleExternalChanges(k, ik *v1beta1.KafkaConne if err != nil { l.Error(err, "Cannot create specification difference message", "instaclustr data", ik.Spec, "k8s resource spec", k.Spec) - return models.ExitReconcile + return models.ExitReconcile, nil } r.EventRecorder.Eventf(k, models.Warning, models.ExternalChanges, msgDiffSpecs) - return models.ExitReconcile + return models.ExitReconcile, nil } patch := k.NewPatch() @@ -318,16 +320,16 @@ func (r *KafkaConnectReconciler) handleExternalChanges(k, ik *v1beta1.KafkaConne r.EventRecorder.Eventf(k, models.Warning, models.PatchFailed, "Cluster resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info("External changes have been reconciled", "resource ID", k.Status.ID) r.EventRecorder.Event(k, models.Normal, models.ExternalChanges, "External changes have been reconciled") - return models.ExitReconcile + return models.ExitReconcile, nil } -func (r *KafkaConnectReconciler) handleDeleteCluster(ctx context.Context, kc *v1beta1.KafkaConnect, l logr.Logger) reconcile.Result { +func (r *KafkaConnectReconciler) handleDeleteCluster(ctx context.Context, kc *v1beta1.KafkaConnect, l logr.Logger) (reconcile.Result, error) { l = l.WithName("Deletion Event") _, err := r.API.GetKafkaConnect(kc.Status.ID) @@ -340,7 +342,7 @@ func (r *KafkaConnectReconciler) handleDeleteCluster(ctx context.Context, kc *v1 "Cluster fetch from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } patch := kc.NewPatch() @@ -360,7 +362,7 @@ func (r *KafkaConnectReconciler) handleDeleteCluster(ctx context.Context, kc *v1 "Cluster deletion on the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Event(kc, models.Normal, models.DeletionStarted, @@ -378,7 +380,7 @@ func (r *KafkaConnectReconciler) handleDeleteCluster(ctx context.Context, kc *v1 "Cluster resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info(msgDeleteClusterWithTwoFactorDelete, "cluster ID", kc.Status.ID) @@ -386,7 +388,7 @@ func (r *KafkaConnectReconciler) handleDeleteCluster(ctx context.Context, kc *v1 r.EventRecorder.Event(kc, models.Normal, models.DeletionStarted, "Two-Factor Delete is enabled, please confirm cluster deletion via email or phone.") - return models.ExitReconcile + return models.ExitReconcile, nil } } @@ -402,7 +404,7 @@ func (r *KafkaConnectReconciler) handleDeleteCluster(ctx context.Context, kc *v1 "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } err = exposeservice.Delete(r.Client, kc.Name, kc.Namespace) @@ -412,7 +414,7 @@ func (r *KafkaConnectReconciler) handleDeleteCluster(ctx context.Context, kc *v1 "cluster name", kc.Spec.Name, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info("Kafka Connect cluster was deleted", @@ -424,7 +426,7 @@ func (r *KafkaConnectReconciler) handleDeleteCluster(ctx context.Context, kc *v1 "Cluster resource is deleted", ) - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *KafkaConnectReconciler) createDefaultSecret(ctx context.Context, kc *v1beta1.KafkaConnect, l logr.Logger) error { @@ -582,6 +584,8 @@ func (r *KafkaConnectReconciler) newWatchStatusJob(kc *v1beta1.KafkaConnect) sch // SetupWithManager sets up the controller with the Manager. func (r *KafkaConnectReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&v1beta1.KafkaConnect{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(event event.CreateEvent) bool { event.Object.SetAnnotations(map[string]string{models.ResourceStateAnnotation: models.CreatingEvent}) diff --git a/controllers/clusters/opensearch_controller.go b/controllers/clusters/opensearch_controller.go index 76c685380..44b5f24db 100644 --- a/controllers/clusters/opensearch_controller.go +++ b/controllers/clusters/opensearch_controller.go @@ -29,6 +29,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" @@ -40,6 +41,7 @@ import ( "github.com/instaclustr/operator/pkg/exposeservice" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" "github.com/instaclustr/operator/pkg/scheduler" ) @@ -78,30 +80,30 @@ func (r *OpenSearchReconciler) Reconcile(ctx context.Context, req ctrl.Request) logger.Error(err, "Unable to fetch OpenSearch cluster resource", "request", req) - return models.ReconcileRequeue, nil + return reconcile.Result{}, err } switch openSearch.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: - return r.HandleCreateCluster(ctx, openSearch, logger), nil + return r.HandleCreateCluster(ctx, openSearch, logger) case models.UpdatingEvent: - return r.HandleUpdateCluster(ctx, openSearch, logger), nil + return r.HandleUpdateCluster(ctx, openSearch, logger) case models.DeletingEvent: - return r.HandleDeleteCluster(ctx, openSearch, logger), nil + return r.HandleDeleteCluster(ctx, openSearch, logger) case models.GenericEvent: logger.Info("Opensearch resource generic event", "cluster manifest", openSearch.Spec, "request", req, "event", openSearch.Annotations[models.ResourceStateAnnotation], ) - return models.ExitReconcile, err + return models.ExitReconcile, nil default: logger.Info("OpenSearch resource event isn't handled", "cluster manifest", openSearch.Spec, "request", req, "event", openSearch.Annotations[models.ResourceStateAnnotation], ) - return models.ExitReconcile, err + return models.ExitReconcile, nil } } @@ -109,7 +111,7 @@ func (r *OpenSearchReconciler) HandleCreateCluster( ctx context.Context, o *v1beta1.OpenSearch, logger logr.Logger, -) reconcile.Result { +) (reconcile.Result, error) { logger = logger.WithName("OpenSearch creation event") var id string var err error @@ -129,7 +131,7 @@ func (r *OpenSearchReconciler) HandleCreateCluster( "Cluster restore from backup on the Instaclustr is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info("OpenSearch cluster was created from backup", @@ -155,7 +157,7 @@ func (r *OpenSearchReconciler) HandleCreateCluster( r.EventRecorder.Eventf(o, models.Warning, models.CreationFailed, "Cluster creation on the Instaclustr is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info("OpenSearch cluster was created", @@ -177,7 +179,7 @@ func (r *OpenSearchReconciler) HandleCreateCluster( r.EventRecorder.Eventf(o, models.Warning, models.PatchFailed, "Cluster resource status patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } o.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent controllerutil.AddFinalizer(o, models.DeletionFinalizer) @@ -190,7 +192,7 @@ func (r *OpenSearchReconciler) HandleCreateCluster( r.EventRecorder.Eventf(o, models.Warning, models.PatchFailed, "Cluster resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info( @@ -209,7 +211,7 @@ func (r *OpenSearchReconciler) HandleCreateCluster( r.EventRecorder.Eventf(o, models.Warning, models.CreationFailed, "Cluster status check job is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Event(o, models.Normal, models.Created, @@ -223,7 +225,7 @@ func (r *OpenSearchReconciler) HandleCreateCluster( r.EventRecorder.Eventf(o, models.Warning, models.CreationFailed, "Cluster backups check job is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Event(o, models.Normal, models.Created, @@ -236,7 +238,7 @@ func (r *OpenSearchReconciler) HandleCreateCluster( r.EventRecorder.Eventf(o, models.Warning, models.CreationFailed, "User creation job is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Event(o, models.Normal, models.Created, @@ -244,14 +246,14 @@ func (r *OpenSearchReconciler) HandleCreateCluster( } } - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *OpenSearchReconciler) HandleUpdateCluster( ctx context.Context, o *v1beta1.OpenSearch, logger logr.Logger, -) reconcile.Result { +) (reconcile.Result, error) { logger = logger.WithName("OpenSearch update event") iData, err := r.API.GetOpenSearch(o.Status.ID) @@ -264,7 +266,7 @@ func (r *OpenSearchReconciler) HandleUpdateCluster( o, models.Warning, models.FetchFailed, "Cluster fetch from the Instaclustr API is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } iOpenSearch, err := o.FromInstAPI(iData) @@ -277,7 +279,7 @@ func (r *OpenSearchReconciler) HandleUpdateCluster( r.EventRecorder.Eventf(o, models.Warning, models.ConvertionFailed, "Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } if iOpenSearch.Status.State != models.RunningStatus { @@ -300,9 +302,9 @@ func (r *OpenSearchReconciler) HandleUpdateCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } - return models.ReconcileRequeue + return reconcile.Result{}, err } if o.Annotations[models.ExternalChangesAnnotation] == models.True { @@ -321,7 +323,7 @@ func (r *OpenSearchReconciler) HandleUpdateCluster( r.EventRecorder.Eventf(o, models.Warning, models.UpdateFailed, "Cannot update cluster settings. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } } @@ -355,9 +357,9 @@ func (r *OpenSearchReconciler) HandleUpdateCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info( @@ -381,17 +383,17 @@ func (r *OpenSearchReconciler) HandleUpdateCluster( r.EventRecorder.Eventf(o, models.Warning, models.PatchFailed, "Cluster resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info("OpenSearch cluster was updated", "cluster name", o.Spec.Name, "cluster ID", o.Status.ID) - return models.ExitReconcile + return models.ExitReconcile, nil } -func (r *OpenSearchReconciler) handleExternalChanges(o, iO *v1beta1.OpenSearch, l logr.Logger) reconcile.Result { +func (r *OpenSearchReconciler) handleExternalChanges(o, iO *v1beta1.OpenSearch, l logr.Logger) (reconcile.Result, error) { if !o.Spec.IsEqual(iO.Spec) { l.Info(msgSpecStillNoMatch, "specification of k8s resource", o.Spec, @@ -401,11 +403,11 @@ func (r *OpenSearchReconciler) handleExternalChanges(o, iO *v1beta1.OpenSearch, if err != nil { l.Error(err, "Cannot create specification difference message", "instaclustr data", iO.Spec, "k8s resource spec", o.Spec) - return models.ExitReconcile + return models.ExitReconcile, nil } r.EventRecorder.Eventf(o, models.Warning, models.ExternalChanges, msgDiffSpecs) - return models.ExitReconcile + return models.ExitReconcile, nil } patch := o.NewPatch() @@ -420,20 +422,20 @@ func (r *OpenSearchReconciler) handleExternalChanges(o, iO *v1beta1.OpenSearch, r.EventRecorder.Eventf(o, models.Warning, models.PatchFailed, "Cluster resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info("External changes have been reconciled", "resource ID", o.Status.ID) r.EventRecorder.Event(o, models.Normal, models.ExternalChanges, "External changes have been reconciled") - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *OpenSearchReconciler) HandleDeleteCluster( ctx context.Context, o *v1beta1.OpenSearch, logger logr.Logger, -) reconcile.Result { +) (reconcile.Result, error) { logger = logger.WithName("OpenSearch deletion event") _, err := r.API.GetOpenSearch(o.Status.ID) @@ -445,7 +447,7 @@ func (r *OpenSearchReconciler) HandleDeleteCluster( r.EventRecorder.Eventf(o, models.Warning, models.FetchFailed, "Cluster resource fetch from the Instaclustr API is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } patch := o.NewPatch() @@ -464,7 +466,7 @@ func (r *OpenSearchReconciler) HandleDeleteCluster( r.EventRecorder.Eventf(o, models.Warning, models.DeletionFailed, "Cluster deletion is failed on the Instaclustr. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Event(o, models.Normal, models.DeletionStarted, @@ -481,7 +483,7 @@ func (r *OpenSearchReconciler) HandleDeleteCluster( r.EventRecorder.Eventf(o, models.Warning, models.PatchFailed, "Cluster resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info(msgDeleteClusterWithTwoFactorDelete, "cluster ID", o.Status.ID) @@ -489,7 +491,7 @@ func (r *OpenSearchReconciler) HandleDeleteCluster( r.EventRecorder.Event(o, models.Normal, models.DeletionStarted, "Two-Factor Delete is enabled, please confirm cluster deletion via email or phone.") - return models.ExitReconcile + return models.ExitReconcile, nil } } @@ -510,7 +512,7 @@ func (r *OpenSearchReconciler) HandleDeleteCluster( r.EventRecorder.Eventf(o, models.Warning, models.DeletionFailed, "Cluster backups deletion is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info("OpenSearch cluster backup resources were deleted", @@ -520,7 +522,7 @@ func (r *OpenSearchReconciler) HandleDeleteCluster( for _, ref := range o.Spec.UserRefs { err = r.detachUserResource(ctx, logger, o, ref) if err != nil { - return models.ReconcileRequeue + return reconcile.Result{}, err } } @@ -536,7 +538,7 @@ func (r *OpenSearchReconciler) HandleDeleteCluster( r.EventRecorder.Eventf(o, models.Warning, models.PatchFailed, "Cluster resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } err = exposeservice.Delete(r.Client, o.Name, o.Namespace) @@ -546,7 +548,7 @@ func (r *OpenSearchReconciler) HandleDeleteCluster( "cluster name", o.Spec.Name, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info("OpenSearch cluster was deleted", @@ -557,7 +559,7 @@ func (r *OpenSearchReconciler) HandleDeleteCluster( r.EventRecorder.Event(o, models.Normal, models.Deleted, "Cluster resource is deleted") - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *OpenSearchReconciler) startClusterStatusJob(cluster *v1beta1.OpenSearch) error { @@ -1159,6 +1161,8 @@ func (r *OpenSearchReconciler) handleUserEvent( // SetupWithManager sets up the controller with the Manager. func (r *OpenSearchReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&v1beta1.OpenSearch{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(event event.CreateEvent) bool { if deleting := confirmDeletion(event.Object); deleting { diff --git a/controllers/clusters/postgresql_controller.go b/controllers/clusters/postgresql_controller.go index d90296410..2083db2b8 100644 --- a/controllers/clusters/postgresql_controller.go +++ b/controllers/clusters/postgresql_controller.go @@ -31,6 +31,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -44,6 +45,7 @@ import ( "github.com/instaclustr/operator/pkg/exposeservice" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" "github.com/instaclustr/operator/pkg/scheduler" ) @@ -91,13 +93,13 @@ func (r *PostgreSQLReconciler) Reconcile(ctx context.Context, req ctrl.Request) switch pg.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: - return r.handleCreateCluster(ctx, pg, logger), nil + return r.handleCreateCluster(ctx, pg, logger) case models.UpdatingEvent: - return r.handleUpdateCluster(ctx, pg, logger), nil + return r.handleUpdateCluster(ctx, pg, logger) case models.DeletingEvent: - return r.handleDeleteCluster(ctx, pg, logger), nil + return r.handleDeleteCluster(ctx, pg, logger) case models.SecretEvent: - return r.handleUpdateDefaultUserPassword(ctx, pg, logger), nil + return r.handleUpdateDefaultUserPassword(ctx, pg, logger) case models.GenericEvent: logger.Info("PostgreSQL resource generic event isn't handled", "cluster name", pg.Spec.Name, @@ -119,7 +121,7 @@ func (r *PostgreSQLReconciler) handleCreateCluster( ctx context.Context, pg *v1beta1.PostgreSQL, logger logr.Logger, -) reconcile.Result { +) (reconcile.Result, error) { logger = logger.WithName("PostgreSQL creation event") var err error @@ -144,7 +146,7 @@ func (r *PostgreSQLReconciler) handleCreateCluster( err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Eventf( @@ -175,7 +177,7 @@ func (r *PostgreSQLReconciler) handleCreateCluster( err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Eventf( @@ -198,7 +200,7 @@ func (r *PostgreSQLReconciler) handleCreateCluster( err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info( @@ -224,7 +226,7 @@ func (r *PostgreSQLReconciler) handleCreateCluster( pg, models.Warning, models.PatchFailed, "Cluster resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } if pg.Status.State != models.DeletedStatus { @@ -239,7 +241,7 @@ func (r *PostgreSQLReconciler) handleCreateCluster( "Cluster status check job is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Eventf( @@ -258,7 +260,7 @@ func (r *PostgreSQLReconciler) handleCreateCluster( "Cluster backups check job is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Eventf( @@ -272,7 +274,7 @@ func (r *PostgreSQLReconciler) handleCreateCluster( logger.Error(err, "Failed to start user PostreSQL creation job") r.EventRecorder.Eventf(pg, models.Warning, models.CreationFailed, "User creation job is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Event(pg, models.Normal, models.Created, @@ -293,17 +295,17 @@ func (r *PostgreSQLReconciler) handleCreateCluster( err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *PostgreSQLReconciler) handleUpdateCluster( ctx context.Context, pg *v1beta1.PostgreSQL, logger logr.Logger, -) reconcile.Result { +) (reconcile.Result, error) { logger = logger.WithName("PostgreSQL update event") iData, err := r.API.GetPostgreSQL(pg.Status.ID) @@ -319,7 +321,7 @@ func (r *PostgreSQLReconciler) handleUpdateCluster( "Cluster fetch from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } iPg, err := pg.FromInstAPI(iData) @@ -335,7 +337,7 @@ func (r *PostgreSQLReconciler) handleUpdateCluster( "Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } if iPg.Status.CurrentClusterOperationStatus != models.NoOperation { @@ -356,9 +358,9 @@ func (r *PostgreSQLReconciler) handleUpdateCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } - return models.ReconcileRequeue + return reconcile.Result{}, err } if pg.Annotations[models.ExternalChangesAnnotation] == models.True { @@ -377,7 +379,7 @@ func (r *PostgreSQLReconciler) handleUpdateCluster( r.EventRecorder.Eventf(pg, models.Warning, models.UpdateFailed, "Cannot update cluster settings. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } } @@ -413,9 +415,9 @@ func (r *PostgreSQLReconciler) handleUpdateCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info( @@ -438,7 +440,7 @@ func (r *PostgreSQLReconciler) handleUpdateCluster( "Cluster configs fetch from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } for _, iConfig := range iConfigs { @@ -459,7 +461,7 @@ func (r *PostgreSQLReconciler) handleUpdateCluster( "Cluster configs fetch from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info("PostgreSQL cluster configurations were updated", @@ -481,7 +483,7 @@ func (r *PostgreSQLReconciler) handleUpdateCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info("PostgreSQL cluster was updated", @@ -489,7 +491,7 @@ func (r *PostgreSQLReconciler) handleUpdateCluster( "cluster status", pg.Status.State, ) - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *PostgreSQLReconciler) createUser( @@ -748,7 +750,7 @@ func (r *PostgreSQLReconciler) handleUserEvent( } } -func (r *PostgreSQLReconciler) handleExternalChanges(pg, iPg *v1beta1.PostgreSQL, l logr.Logger) reconcile.Result { +func (r *PostgreSQLReconciler) handleExternalChanges(pg, iPg *v1beta1.PostgreSQL, l logr.Logger) (reconcile.Result, error) { if !pg.Spec.IsEqual(iPg.Spec) { l.Info(msgSpecStillNoMatch, "specification of k8s resource", pg.Spec, @@ -757,11 +759,11 @@ func (r *PostgreSQLReconciler) handleExternalChanges(pg, iPg *v1beta1.PostgreSQL if err != nil { l.Error(err, "Cannot create specification difference message", "instaclustr data", iPg.Spec, "k8s resource spec", pg.Spec) - return models.ExitReconcile + return models.ExitReconcile, nil } r.EventRecorder.Eventf(pg, models.Warning, models.ExternalChanges, msgDiffSpecs) - return models.ExitReconcile + return models.ExitReconcile, nil } patch := pg.NewPatch() @@ -776,20 +778,20 @@ func (r *PostgreSQLReconciler) handleExternalChanges(pg, iPg *v1beta1.PostgreSQL r.EventRecorder.Eventf(pg, models.Warning, models.PatchFailed, "Cluster resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info("External changes have been reconciled", "resource ID", pg.Status.ID) r.EventRecorder.Event(pg, models.Normal, models.ExternalChanges, "External changes have been reconciled") - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *PostgreSQLReconciler) handleDeleteCluster( ctx context.Context, pg *v1beta1.PostgreSQL, logger logr.Logger, -) reconcile.Result { +) (reconcile.Result, error) { logger = logger.WithName("PostgreSQL deletion event") _, err := r.API.GetPostgreSQL(pg.Status.ID) @@ -804,7 +806,7 @@ func (r *PostgreSQLReconciler) handleDeleteCluster( "Cluster resource fetch from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } if !errors.Is(err, instaclustr.NotFound) { @@ -824,7 +826,7 @@ func (r *PostgreSQLReconciler) handleDeleteCluster( err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Event(pg, models.Normal, models.DeletionStarted, @@ -844,7 +846,7 @@ func (r *PostgreSQLReconciler) handleDeleteCluster( "Cluster resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info(msgDeleteClusterWithTwoFactorDelete, "cluster ID", pg.Status.ID) @@ -852,7 +854,7 @@ func (r *PostgreSQLReconciler) handleDeleteCluster( r.EventRecorder.Event(pg, models.Normal, models.DeletionStarted, "Two-Factor Delete is enabled, please confirm cluster deletion via email or phone.") - return models.ExitReconcile + return models.ExitReconcile, nil } } @@ -874,7 +876,7 @@ func (r *PostgreSQLReconciler) handleDeleteCluster( "Cluster backups deletion is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info("Cluster backup resources were deleted", @@ -893,7 +895,7 @@ func (r *PostgreSQLReconciler) handleDeleteCluster( for _, ref := range pg.Spec.UserRefs { err = r.handleUsersDetach(ctx, logger, pg, ref) if err != nil { - return models.ReconcileRequeue + return reconcile.Result{}, err } } @@ -912,7 +914,7 @@ func (r *PostgreSQLReconciler) handleDeleteCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } err = r.deleteSecret(ctx, pg) @@ -926,7 +928,7 @@ func (r *PostgreSQLReconciler) handleDeleteCluster( "Default user secret deletion is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info("Cluster PostgreSQL default user secret was deleted", @@ -946,7 +948,7 @@ func (r *PostgreSQLReconciler) handleDeleteCluster( "cluster name", pg.Spec.Name, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info("PostgreSQL cluster was deleted", @@ -959,14 +961,14 @@ func (r *PostgreSQLReconciler) handleDeleteCluster( "Cluster resource is deleted", ) - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *PostgreSQLReconciler) handleUpdateDefaultUserPassword( ctx context.Context, pg *v1beta1.PostgreSQL, logger logr.Logger, -) reconcile.Result { +) (reconcile.Result, error) { logger = logger.WithName("PostgreSQL default user password updating event") secret, err := v1beta1.GetDefaultPgUserSecret(ctx, pg.Name, pg.Namespace, r.Client) @@ -982,7 +984,7 @@ func (r *PostgreSQLReconciler) handleUpdateDefaultUserPassword( err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } password := string(secret.Data[models.Password]) @@ -999,7 +1001,7 @@ func (r *PostgreSQLReconciler) handleUpdateDefaultUserPassword( err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } err = r.API.UpdatePostgreSQLDefaultUserPassword(pg.Status.ID, password) @@ -1015,7 +1017,7 @@ func (r *PostgreSQLReconciler) handleUpdateDefaultUserPassword( err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } pg.Annotations[models.ResourceStateAnnotation] = models.UpdatedEvent @@ -1031,7 +1033,7 @@ func (r *PostgreSQLReconciler) handleUpdateDefaultUserPassword( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info("PostgreSQL default user password was updated", @@ -1044,7 +1046,7 @@ func (r *PostgreSQLReconciler) handleUpdateDefaultUserPassword( "Cluster default user password is updated", ) - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *PostgreSQLReconciler) startClusterStatusJob(pg *v1beta1.PostgreSQL) error { @@ -1644,6 +1646,8 @@ func (r *PostgreSQLReconciler) findSecretObject(secret client.Object) []reconcil // SetupWithManager sets up the controller with the Manager. func (r *PostgreSQLReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&v1beta1.PostgreSQL{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(event event.CreateEvent) bool { if deleting := confirmDeletion(event.Object); deleting { diff --git a/controllers/clusters/redis_controller.go b/controllers/clusters/redis_controller.go index 0ee7b3eda..a46d687fb 100644 --- a/controllers/clusters/redis_controller.go +++ b/controllers/clusters/redis_controller.go @@ -29,6 +29,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" @@ -40,6 +41,7 @@ import ( "github.com/instaclustr/operator/pkg/exposeservice" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" "github.com/instaclustr/operator/pkg/scheduler" ) @@ -83,11 +85,11 @@ func (r *RedisReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl switch redis.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: - return r.handleCreateCluster(ctx, redis, logger), nil + return r.handleCreateCluster(ctx, redis, logger) case models.UpdatingEvent: - return r.handleUpdateCluster(ctx, redis, logger), nil + return r.handleUpdateCluster(ctx, redis, logger) case models.DeletingEvent: - return r.handleDeleteCluster(ctx, redis, logger), nil + return r.handleDeleteCluster(ctx, redis, logger) case models.GenericEvent: logger.Info("Redis generic event isn't handled", "cluster name", redis.Spec.Name, @@ -107,7 +109,7 @@ func (r *RedisReconciler) handleCreateCluster( ctx context.Context, redis *v1beta1.Redis, logger logr.Logger, -) reconcile.Result { +) (reconcile.Result, error) { var err error if redis.Status.ID == "" { var id string @@ -128,7 +130,7 @@ func (r *RedisReconciler) handleCreateCluster( "Cluster restore from backup on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info( @@ -160,7 +162,7 @@ func (r *RedisReconciler) handleCreateCluster( "Cluster creation on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info( @@ -184,7 +186,7 @@ func (r *RedisReconciler) handleCreateCluster( r.EventRecorder.Eventf(redis, models.Warning, models.PatchFailed, "Cluster resource status patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info("Redis resource has been created", @@ -207,7 +209,7 @@ func (r *RedisReconciler) handleCreateCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } if redis.Status.State != models.DeletedStatus { @@ -222,7 +224,7 @@ func (r *RedisReconciler) handleCreateCluster( "Cluster status job creation is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Eventf( @@ -241,7 +243,7 @@ func (r *RedisReconciler) handleCreateCluster( "Cluster backups job creation is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Eventf( @@ -257,7 +259,7 @@ func (r *RedisReconciler) handleCreateCluster( r.EventRecorder.Eventf(redis, models.Warning, models.CreationFailed, "User creation job is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Event(redis, models.Normal, models.Created, @@ -274,7 +276,7 @@ func (r *RedisReconciler) handleCreateCluster( "namespace", redis.Namespace, ) - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *RedisReconciler) startUsersCreationJob(cluster *v1beta1.Redis) error { @@ -292,7 +294,7 @@ func (r *RedisReconciler) handleUpdateCluster( ctx context.Context, redis *v1beta1.Redis, logger logr.Logger, -) reconcile.Result { +) (reconcile.Result, error) { iData, err := r.API.GetRedis(redis.Status.ID) if err != nil { logger.Error( @@ -306,7 +308,7 @@ func (r *RedisReconciler) handleUpdateCluster( "Fetch cluster from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } iRedis, err := redis.FromInstAPI(iData) @@ -322,7 +324,7 @@ func (r *RedisReconciler) handleUpdateCluster( "Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } if redis.Annotations[models.ExternalChangesAnnotation] == models.True { @@ -341,7 +343,7 @@ func (r *RedisReconciler) handleUpdateCluster( r.EventRecorder.Eventf(redis, models.Warning, models.UpdateFailed, "Cannot update cluster settings. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } } @@ -379,9 +381,9 @@ func (r *RedisReconciler) handleUpdateCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } - return models.ReconcileRequeue + return reconcile.Result{}, err } } @@ -400,7 +402,7 @@ func (r *RedisReconciler) handleUpdateCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info( @@ -410,7 +412,7 @@ func (r *RedisReconciler) handleUpdateCluster( "data centres", redis.Spec.DataCentres, ) - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *RedisReconciler) handleCreateUsers( @@ -473,7 +475,7 @@ func (r *RedisReconciler) handleCreateUsers( return nil } -func (r *RedisReconciler) handleExternalChanges(redis, iRedis *v1beta1.Redis, l logr.Logger) reconcile.Result { +func (r *RedisReconciler) handleExternalChanges(redis, iRedis *v1beta1.Redis, l logr.Logger) (reconcile.Result, error) { if !redis.Spec.IsEqual(iRedis.Spec) { l.Info(msgSpecStillNoMatch, "specification of k8s resource", redis.Spec, @@ -483,10 +485,10 @@ func (r *RedisReconciler) handleExternalChanges(redis, iRedis *v1beta1.Redis, l if err != nil { l.Error(err, "Cannot create specification difference message", "instaclustr data", iRedis.Spec, "k8s resource spec", redis.Spec) - return models.ExitReconcile + return models.ExitReconcile, nil } r.EventRecorder.Eventf(redis, models.Warning, models.ExternalChanges, msgDiffSpecs) - return models.ExitReconcile + return models.ExitReconcile, nil } patch := redis.NewPatch() @@ -501,20 +503,20 @@ func (r *RedisReconciler) handleExternalChanges(redis, iRedis *v1beta1.Redis, l r.EventRecorder.Eventf(redis, models.Warning, models.PatchFailed, "Cluster resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info("External changes have been reconciled", "resource ID", redis.Status.ID) r.EventRecorder.Event(redis, models.Normal, models.ExternalChanges, "External changes have been reconciled") - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *RedisReconciler) handleDeleteCluster( ctx context.Context, redis *v1beta1.Redis, logger logr.Logger, -) reconcile.Result { +) (reconcile.Result, error) { _, err := r.API.GetRedis(redis.Status.ID) if err != nil && !errors.Is(err, instaclustr.NotFound) { @@ -528,7 +530,7 @@ func (r *RedisReconciler) handleDeleteCluster( "Fetch cluster from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } if !errors.Is(err, instaclustr.NotFound) { @@ -548,7 +550,7 @@ func (r *RedisReconciler) handleDeleteCluster( "Cluster deletion on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Event(redis, models.Normal, models.DeletionStarted, @@ -568,7 +570,7 @@ func (r *RedisReconciler) handleDeleteCluster( "Cluster resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info(msgDeleteClusterWithTwoFactorDelete, "cluster ID", redis.Status.ID) @@ -576,7 +578,7 @@ func (r *RedisReconciler) handleDeleteCluster( r.EventRecorder.Event(redis, models.Normal, models.DeletionStarted, "Two-Factor Delete is enabled, please confirm cluster deletion via email or phone.") - return models.ExitReconcile + return models.ExitReconcile, nil } } @@ -597,7 +599,7 @@ func (r *RedisReconciler) handleDeleteCluster( "Cluster backups deletion is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Eventf( @@ -622,7 +624,7 @@ func (r *RedisReconciler) handleDeleteCluster( "Cluster detaching on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } } @@ -641,7 +643,7 @@ func (r *RedisReconciler) handleDeleteCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } err = exposeservice.Delete(r.Client, redis.Name, redis.Namespace) @@ -651,7 +653,7 @@ func (r *RedisReconciler) handleDeleteCluster( "cluster name", redis.Spec.Name, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } logger.Info("Redis cluster was deleted", @@ -664,7 +666,7 @@ func (r *RedisReconciler) handleDeleteCluster( "Cluster resource is deleted", ) - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *RedisReconciler) detachUserResource( @@ -1194,6 +1196,8 @@ func (r *RedisReconciler) deleteBackups(ctx context.Context, clusterID, namespac // SetupWithManager sets up the controller with the Manager. func (r *RedisReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&v1beta1.Redis{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(event event.CreateEvent) bool { if deleting := confirmDeletion(event.Object); deleting { diff --git a/controllers/clusters/zookeeper_controller.go b/controllers/clusters/zookeeper_controller.go index d2252ce7b..ad3e0e781 100644 --- a/controllers/clusters/zookeeper_controller.go +++ b/controllers/clusters/zookeeper_controller.go @@ -27,6 +27,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" @@ -37,6 +38,7 @@ import ( "github.com/instaclustr/operator/pkg/exposeservice" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" "github.com/instaclustr/operator/pkg/scheduler" ) @@ -73,16 +75,16 @@ func (r *ZookeeperReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( l.Error(err, "unable to fetch Zookeeper", "request", req) - return models.ReconcileRequeue, err + return reconcile.Result{}, err } switch zook.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: - return r.handleCreateCluster(ctx, zook, l), nil + return r.handleCreateCluster(ctx, zook, l) case models.UpdatingEvent: - return r.handleUpdateCluster(zook, l), nil + return r.handleUpdateCluster(zook, l) case models.DeletingEvent: - return r.handleDeleteCluster(ctx, zook, l), nil + return r.handleDeleteCluster(ctx, zook, l) case models.GenericEvent: l.Info("Generic event isn't handled", "cluster name", zook.Spec.Name, "request", req, "event", zook.Annotations[models.ResourceStateAnnotation]) @@ -101,7 +103,7 @@ func (r *ZookeeperReconciler) handleCreateCluster( ctx context.Context, zook *v1beta1.Zookeeper, l logr.Logger, -) reconcile.Result { +) (reconcile.Result, error) { var err error l = l.WithName("Creation Event") @@ -120,7 +122,7 @@ func (r *ZookeeperReconciler) handleCreateCluster( "Cluster creation on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Eventf( @@ -138,7 +140,7 @@ func (r *ZookeeperReconciler) handleCreateCluster( "Cluster resource status patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } zook.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent @@ -151,7 +153,7 @@ func (r *ZookeeperReconciler) handleCreateCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info("Zookeeper cluster has been created", "cluster ID", zook.Status.ID) @@ -168,7 +170,7 @@ func (r *ZookeeperReconciler) handleCreateCluster( err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } } @@ -182,7 +184,7 @@ func (r *ZookeeperReconciler) handleCreateCluster( "Cluster status check job creation is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Eventf( @@ -191,7 +193,7 @@ func (r *ZookeeperReconciler) handleCreateCluster( ) } - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *ZookeeperReconciler) createDefaultSecret(ctx context.Context, zk *v1beta1.Zookeeper, l logr.Logger) error { @@ -226,23 +228,23 @@ func (r *ZookeeperReconciler) createDefaultSecret(ctx context.Context, zk *v1bet func (r *ZookeeperReconciler) handleUpdateCluster( zook *v1beta1.Zookeeper, l logr.Logger, -) reconcile.Result { +) (reconcile.Result, error) { l = l.WithName("Update Event") if zook.Annotations[models.ExternalChangesAnnotation] == models.True { - r.handleExternalChanges(zook, l) + return r.handleExternalChanges(zook, l) } iData, err := r.API.GetZookeeper(zook.Status.ID) if err != nil { l.Error(err, "Cannot get cluster from the Instaclustr", "cluster ID", zook.Status.ID) - return models.ReconcileRequeue + return reconcile.Result{}, err } iZook, err := zook.FromInstAPI(iData) if err != nil { l.Error(err, "Cannot convert cluster from the Instaclustr API", "cluster ID", zook.Status.ID) - return models.ReconcileRequeue + return reconcile.Result{}, err } if zook.Spec.ClusterSettingsNeedUpdate(iZook.Spec.Cluster) { @@ -257,24 +259,24 @@ func (r *ZookeeperReconciler) handleUpdateCluster( r.EventRecorder.Eventf(zook, models.Warning, models.UpdateFailed, "Cannot update cluster settings. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } } - return models.ExitReconcile + return models.ExitReconcile, nil } -func (r *ZookeeperReconciler) handleExternalChanges(zook *v1beta1.Zookeeper, l logr.Logger) reconcile.Result { +func (r *ZookeeperReconciler) handleExternalChanges(zook *v1beta1.Zookeeper, l logr.Logger) (reconcile.Result, error) { iData, err := r.API.GetZookeeper(zook.Status.ID) if err != nil { l.Error(err, "Cannot get cluster from the Instaclustr", "cluster ID", zook.Status.ID) - return models.ReconcileRequeue + return reconcile.Result{}, err } iZook, err := zook.FromInstAPI(iData) if err != nil { l.Error(err, "Cannot convert cluster from the Instaclustr API", "cluster ID", zook.Status.ID) - return models.ReconcileRequeue + return reconcile.Result{}, err } if !zook.Spec.IsEqual(iZook.Spec) { @@ -286,11 +288,11 @@ func (r *ZookeeperReconciler) handleExternalChanges(zook *v1beta1.Zookeeper, l l if err != nil { l.Error(err, "Cannot create specification difference message", "instaclustr data", iZook.Spec, "k8s resource spec", zook.Spec) - return models.ExitReconcile + return models.ExitReconcile, nil } r.EventRecorder.Eventf(zook, models.Warning, models.ExternalChanges, msgDiffSpecs) - return models.ExitReconcile + return models.ExitReconcile, nil } patch := zook.NewPatch() @@ -305,20 +307,20 @@ func (r *ZookeeperReconciler) handleExternalChanges(zook *v1beta1.Zookeeper, l l r.EventRecorder.Eventf(zook, models.Warning, models.PatchFailed, "Cluster resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info("External changes have been reconciled", "resource ID", zook.Status.ID) r.EventRecorder.Event(zook, models.Normal, models.ExternalChanges, "External changes have been reconciled") - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *ZookeeperReconciler) handleDeleteCluster( ctx context.Context, zook *v1beta1.Zookeeper, l logr.Logger, -) reconcile.Result { +) (reconcile.Result, error) { l = l.WithName("Deletion Event") _, err := r.API.GetZookeeper(zook.Status.ID) @@ -331,7 +333,7 @@ func (r *ZookeeperReconciler) handleDeleteCluster( "Cluster resource fetch from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } patch := zook.NewPatch() @@ -351,7 +353,7 @@ func (r *ZookeeperReconciler) handleDeleteCluster( "Cluster deletion is failed on the Instaclustr. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Event(zook, models.Normal, models.DeletionStarted, @@ -370,7 +372,7 @@ func (r *ZookeeperReconciler) handleDeleteCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info(msgDeleteClusterWithTwoFactorDelete, "cluster ID", zook.Status.ID) @@ -378,7 +380,7 @@ func (r *ZookeeperReconciler) handleDeleteCluster( r.EventRecorder.Event(zook, models.Normal, models.DeletionStarted, "Two-Factor Delete is enabled, please confirm cluster deletion via email or phone.") - return models.ExitReconcile + return models.ExitReconcile, nil } } @@ -394,7 +396,7 @@ func (r *ZookeeperReconciler) handleDeleteCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } err = exposeservice.Delete(r.Client, zook.Name, zook.Namespace) @@ -404,7 +406,7 @@ func (r *ZookeeperReconciler) handleDeleteCluster( "cluster name", zook.Spec.Name, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info("Zookeeper cluster was deleted", @@ -416,7 +418,7 @@ func (r *ZookeeperReconciler) handleDeleteCluster( "Cluster resource is deleted", ) - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *ZookeeperReconciler) startClusterStatusJob(Zookeeper *v1beta1.Zookeeper) error { @@ -557,6 +559,8 @@ func (r *ZookeeperReconciler) handleExternalDelete(ctx context.Context, zook *v1 // SetupWithManager sets up the controller with the Manager. func (r *ZookeeperReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&v1beta1.Zookeeper{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(event event.CreateEvent) bool { event.Object.GetAnnotations()[models.ResourceStateAnnotation] = models.CreatingEvent