diff --git a/controllers/flinkcluster/flinkcluster_controller.go b/controllers/flinkcluster/flinkcluster_controller.go index cdc80442..717d15ac 100644 --- a/controllers/flinkcluster/flinkcluster_controller.go +++ b/controllers/flinkcluster/flinkcluster_controller.go @@ -186,6 +186,11 @@ func (handler *FlinkClusterHandler) reconcile(ctx context.Context, } else { log.Info("Desired state", "ConfigMap", "nil") } + if desired.PodDisruptionBudget != nil { + log.Info("Desired state", "PodDisruptionBudget", *desired.PodDisruptionBudget) + } else { + log.Info("Desired state", "PodDisruptionBudget", "nil") + } if desired.JmStatefulSet != nil { log.Info("Desired state", "JobManager StatefulSet", *desired.JmStatefulSet) } else { diff --git a/controllers/flinkcluster/flinkcluster_converter.go b/controllers/flinkcluster/flinkcluster_converter.go index 0f882fea..ff6643cb 100644 --- a/controllers/flinkcluster/flinkcluster_converter.go +++ b/controllers/flinkcluster/flinkcluster_converter.go @@ -35,6 +35,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" + policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" @@ -57,6 +58,7 @@ const ( jobPyFilesUriEnvVar = "FLINK_JOB_PY_FILES_URI" hadoopConfDirEnvVar = "HADOOP_CONF_DIR" gacEnvVar = "GOOGLE_APPLICATION_CREDENTIALS" + maxUnavailableDefault = "0%" ) var ( @@ -87,7 +89,9 @@ func getDesiredClusterState(observed *ObservedClusterState) *model.DesiredCluste if !shouldCleanup(cluster, "ConfigMap") { state.ConfigMap = newConfigMap(cluster) } - + if !shouldCleanup(cluster, "PodDisruptionBudget") { + state.PodDisruptionBudget = newPodDisruptionBudget(cluster) + } if !shouldCleanup(cluster, "JobManagerStatefulSet") && !applicationMode { state.JmStatefulSet = newJobManagerStatefulSet(cluster) } @@ -523,6 +527,35 @@ func newTaskManagerStatefulSet(flinkCluster *v1beta1.FlinkCluster) *appsv1.State } } +// Gets the desired PodDisruptionBudget. +func newPodDisruptionBudget(flinkCluster *v1beta1.FlinkCluster) *policyv1.PodDisruptionBudget { + var jobSpec = flinkCluster.Spec.Job + if jobSpec == nil { + return nil + } + var clusterNamespace = flinkCluster.Namespace + var clusterName = flinkCluster.Name + var pdbName = getPodDisruptionBudgetName(clusterName) + var labels = getClusterLabels(flinkCluster) + + var maxUnavailablePods = intstr.FromString(maxUnavailableDefault) + + return &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: clusterNamespace, + Name: pdbName, + OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)}, + Labels: labels, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + MaxUnavailable: &maxUnavailablePods, + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + }, + } +} + // Gets the desired configMap. func newConfigMap(flinkCluster *v1beta1.FlinkCluster) *corev1.ConfigMap { appVersion, _ := version.NewVersion(flinkCluster.Spec.FlinkVersion) diff --git a/controllers/flinkcluster/flinkcluster_observer.go b/controllers/flinkcluster/flinkcluster_observer.go index 310a4623..bc04785e 100644 --- a/controllers/flinkcluster/flinkcluster_observer.go +++ b/controllers/flinkcluster/flinkcluster_observer.go @@ -31,6 +31,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" + policyv1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" @@ -58,6 +59,7 @@ type ObservedClusterState struct { jmService *corev1.Service jmIngress *networkingv1.Ingress tmStatefulSet *appsv1.StatefulSet + podDisruptionBudget *policyv1.PodDisruptionBudget persistentVolumeClaims *corev1.PersistentVolumeClaimList flinkJob FlinkJob flinkJobSubmitter FlinkJobSubmitter @@ -175,6 +177,21 @@ func (observer *ClusterStateObserver) observe( observed.configMap = observedConfigMap } + // PodDisruptionBudget. + var observedPodDisruptionBudget = new(policyv1.PodDisruptionBudget) + err = observer.observePodDisruptionBudget(observedPodDisruptionBudget) + if err != nil { + if client.IgnoreNotFound(err) != nil { + log.Error(err, "Failed to get PodDisruptionBudget") + return err + } + log.Info("Observed PodDisruptionBudget", "state", "nil") + observedPodDisruptionBudget = nil + } else { + log.Info("Observed PodDisruptionBudget", "state", *observedPodDisruptionBudget) + observed.podDisruptionBudget = observedPodDisruptionBudget + } + // JobManager StatefulSet. var observedJmStatefulSet = new(appsv1.StatefulSet) err = observer.observeJobManagerStatefulSet(observedJmStatefulSet) @@ -427,6 +444,20 @@ func (observer *ClusterStateObserver) observeRevisions( return err } +func (observer *ClusterStateObserver) observePodDisruptionBudget( + observedPodDisruptionBudget *policyv1.PodDisruptionBudget) error { + var clusterNamespace = observer.request.Namespace + var clusterName = observer.request.Name + + return observer.k8sClient.Get( + observer.context, + types.NamespacedName{ + Namespace: clusterNamespace, + Name: getPodDisruptionBudgetName(clusterName), + }, + observedPodDisruptionBudget) +} + func (observer *ClusterStateObserver) observeConfigMap( observedConfigMap *corev1.ConfigMap) error { var clusterNamespace = observer.request.Namespace diff --git a/controllers/flinkcluster/flinkcluster_reconciler.go b/controllers/flinkcluster/flinkcluster_reconciler.go index 5976b32f..470ed0a9 100644 --- a/controllers/flinkcluster/flinkcluster_reconciler.go +++ b/controllers/flinkcluster/flinkcluster_reconciler.go @@ -40,6 +40,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" + policyv1 "k8s.io/api/policy/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -85,6 +86,11 @@ func (reconciler *ClusterReconciler) reconcile() (ctrl.Result, error) { return ctrl.Result{}, err } + err = reconciler.reconcilePodDisruptionBudget() + if err != nil { + return ctrl.Result{}, err + } + err = reconciler.reconcileJobManagerStatefulSet() if err != nil { return ctrl.Result{}, err @@ -463,6 +469,54 @@ func (reconciler *ClusterReconciler) deleteConfigMap( return err } +func (reconciler *ClusterReconciler) reconcilePodDisruptionBudget() error { + var desiredPodDisruptionBudget = reconciler.desired.PodDisruptionBudget + var observedPodDisruptionBudget = reconciler.observed.podDisruptionBudget + + if desiredPodDisruptionBudget != nil && observedPodDisruptionBudget == nil { + return reconciler.createPodDisruptionBudget(desiredPodDisruptionBudget, "PodDisruptionBudget") + } + + if desiredPodDisruptionBudget == nil && observedPodDisruptionBudget != nil { + return reconciler.deletePodDisruptionBudget(observedPodDisruptionBudget, "PodDisruptionBudget") + } + + return nil +} + +func (reconciler *ClusterReconciler) createPodDisruptionBudget( + pdb *policyv1.PodDisruptionBudget, component string) error { + var context = reconciler.context + var log = reconciler.log.WithValues("component", component) + var k8sClient = reconciler.k8sClient + + log.Info("Creating PodDisruptionBudget", "PodDisruptionBudget", *pdb) + var err = k8sClient.Create(context, pdb) + if err != nil { + log.Info("Failed to create PodDisruptionBudget", "error", err) + } else { + log.Info("PodDisruptionBudget created") + } + return err +} + +func (reconciler *ClusterReconciler) deletePodDisruptionBudget( + pdb *policyv1.PodDisruptionBudget, component string) error { + var context = reconciler.context + var log = reconciler.log.WithValues("component", component) + var k8sClient = reconciler.k8sClient + + log.Info("Deleting PodDisruptionBudget", "PodDisruptionBudget", pdb) + var err = k8sClient.Delete(context, pdb) + err = client.IgnoreNotFound(err) + if err != nil { + log.Error(err, "Failed to delete PodDisruptionBudget") + } else { + log.Info("PodDisruptionBudget deleted") + } + return err +} + func (reconciler *ClusterReconciler) reconcilePersistentVolumeClaims() error { observed := reconciler.observed pvcs := observed.persistentVolumeClaims diff --git a/controllers/flinkcluster/flinkcluster_util.go b/controllers/flinkcluster/flinkcluster_util.go index 7b5b471e..d9bc1aab 100644 --- a/controllers/flinkcluster/flinkcluster_util.go +++ b/controllers/flinkcluster/flinkcluster_util.go @@ -35,6 +35,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" + policyv1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -96,6 +97,11 @@ func getConfigMapName(clusterName string) string { return clusterName + "-configmap" } +// Gets PodDisruptionBudgetName name +func getPodDisruptionBudgetName(clusterName string) string { + return "flink-" + clusterName +} + // Gets JobManager StatefulSet name func getJobManagerStatefulSetName(clusterName string) string { return clusterName + "-jobmanager" @@ -338,6 +344,10 @@ func isComponentUpdated(component runtime.Object, cluster *v1beta1.FlinkCluster) if o == nil { return false } + case *policyv1.PodDisruptionBudget: + if o == nil { + return false + } case *corev1.Service: if o == nil { return false @@ -373,6 +383,7 @@ func areComponentsUpdated(components []runtime.Object, cluster *v1beta1.FlinkClu func isUpdatedAll(observed ObservedClusterState) bool { components := []runtime.Object{ observed.configMap, + observed.podDisruptionBudget, observed.jmStatefulSet, observed.tmStatefulSet, observed.jmService, @@ -389,6 +400,7 @@ func isClusterUpdateToDate(observed *ObservedClusterState) bool { } components := []runtime.Object{ observed.configMap, + observed.podDisruptionBudget, observed.jmStatefulSet, observed.tmStatefulSet, observed.jmService, diff --git a/controllers/flinkcluster/flinkcluster_util_test.go b/controllers/flinkcluster/flinkcluster_util_test.go index ae341483..32da9771 100644 --- a/controllers/flinkcluster/flinkcluster_util_test.go +++ b/controllers/flinkcluster/flinkcluster_util_test.go @@ -24,6 +24,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" + policyv1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -263,11 +264,12 @@ func TestGetUpdateState(t *testing.T) { Components: v1beta1.FlinkClusterComponentsStatus{Job: &v1beta1.JobStatus{State: v1beta1.JobStateRunning}}, Revision: v1beta1.RevisionStatus{CurrentRevision: "cluster-85dc8f749-2", NextRevision: "cluster-aa5e3a87z-3"}}, }, - flinkJobSubmitter: FlinkJobSubmitter{job: &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}}, - configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, - jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, - tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, - jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, + flinkJobSubmitter: FlinkJobSubmitter{job: &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}}, + configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, + podDisruptionBudget: &policyv1.PodDisruptionBudget{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, + jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, + tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, + jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, } var state = getUpdateState(&observed) assert.Equal(t, state, UpdateStatePreparing) @@ -297,12 +299,13 @@ func TestGetUpdateState(t *testing.T) { }, Status: v1beta1.FlinkClusterStatus{Revision: v1beta1.RevisionStatus{CurrentRevision: "cluster-85dc8f749-2", NextRevision: "cluster-aa5e3a87z-3"}}, }, - flinkJobSubmitter: FlinkJobSubmitter{job: &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}}, - configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, - jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, - tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, - jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, - jmIngress: &networkingv1.Ingress{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, + flinkJobSubmitter: FlinkJobSubmitter{job: &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}}, + configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, + podDisruptionBudget: &policyv1.PodDisruptionBudget{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, + jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, + tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, + jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, + jmIngress: &networkingv1.Ingress{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, } state = getUpdateState(&observed) assert.Equal(t, state, UpdateStateFinished) diff --git a/internal/model/model.go b/internal/model/model.go index 8dcce19d..c7c165e0 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -18,14 +18,16 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" + policyv1 "k8s.io/api/policy/v1" ) // DesiredClusterState holds desired state of a cluster. type DesiredClusterState struct { - JmStatefulSet *appsv1.StatefulSet - JmService *corev1.Service - JmIngress *networkingv1.Ingress - TmStatefulSet *appsv1.StatefulSet - ConfigMap *corev1.ConfigMap - Job *batchv1.Job + JmStatefulSet *appsv1.StatefulSet + JmService *corev1.Service + JmIngress *networkingv1.Ingress + TmStatefulSet *appsv1.StatefulSet + ConfigMap *corev1.ConfigMap + Job *batchv1.Job + PodDisruptionBudget *policyv1.PodDisruptionBudget }