Skip to content

Commit

Permalink
Add PodDisruptionBudget (#353)
Browse files Browse the repository at this point in the history
Co-authored-by: Barna Kutassy <[email protected]>
  • Loading branch information
Barnabas Kutassy and Barna Kutassy authored Apr 5, 2022
1 parent fab2f6b commit 7052c67
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 18 deletions.
5 changes: 5 additions & 0 deletions controllers/flinkcluster/flinkcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ func (handler *FlinkClusterHandler) reconcile(ctx context.Context,
} else {
log.Info("Desired state", "ConfigMap", "nil")
}
if desired.PodDisruptionBudget != nil {
log.Info("Desired state", "PodDisruptionBudget", *desired.PodDisruptionBudget)
} else {
log.Info("Desired state", "PodDisruptionBudget", "nil")
}
if desired.JmStatefulSet != nil {
log.Info("Desired state", "JobManager StatefulSet", *desired.JmStatefulSet)
} else {
Expand Down
35 changes: 34 additions & 1 deletion controllers/flinkcluster/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"

Expand All @@ -57,6 +58,7 @@ const (
jobPyFilesUriEnvVar = "FLINK_JOB_PY_FILES_URI"
hadoopConfDirEnvVar = "HADOOP_CONF_DIR"
gacEnvVar = "GOOGLE_APPLICATION_CREDENTIALS"
maxUnavailableDefault = "0%"
)

var (
Expand Down Expand Up @@ -87,7 +89,9 @@ func getDesiredClusterState(observed *ObservedClusterState) *model.DesiredCluste
if !shouldCleanup(cluster, "ConfigMap") {
state.ConfigMap = newConfigMap(cluster)
}

if !shouldCleanup(cluster, "PodDisruptionBudget") {
state.PodDisruptionBudget = newPodDisruptionBudget(cluster)
}
if !shouldCleanup(cluster, "JobManagerStatefulSet") && !applicationMode {
state.JmStatefulSet = newJobManagerStatefulSet(cluster)
}
Expand Down Expand Up @@ -523,6 +527,35 @@ func newTaskManagerStatefulSet(flinkCluster *v1beta1.FlinkCluster) *appsv1.State
}
}

// Gets the desired PodDisruptionBudget.
func newPodDisruptionBudget(flinkCluster *v1beta1.FlinkCluster) *policyv1.PodDisruptionBudget {
var jobSpec = flinkCluster.Spec.Job
if jobSpec == nil {
return nil
}
var clusterNamespace = flinkCluster.Namespace
var clusterName = flinkCluster.Name
var pdbName = getPodDisruptionBudgetName(clusterName)
var labels = getClusterLabels(flinkCluster)

var maxUnavailablePods = intstr.FromString(maxUnavailableDefault)

return &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Namespace: clusterNamespace,
Name: pdbName,
OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)},
Labels: labels,
},
Spec: policyv1.PodDisruptionBudgetSpec{
MaxUnavailable: &maxUnavailablePods,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
},
}
}

// Gets the desired configMap.
func newConfigMap(flinkCluster *v1beta1.FlinkCluster) *corev1.ConfigMap {
appVersion, _ := version.NewVersion(flinkCluster.Spec.FlinkVersion)
Expand Down
31 changes: 31 additions & 0 deletions controllers/flinkcluster/flinkcluster_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -58,6 +59,7 @@ type ObservedClusterState struct {
jmService *corev1.Service
jmIngress *networkingv1.Ingress
tmStatefulSet *appsv1.StatefulSet
podDisruptionBudget *policyv1.PodDisruptionBudget
persistentVolumeClaims *corev1.PersistentVolumeClaimList
flinkJob FlinkJob
flinkJobSubmitter FlinkJobSubmitter
Expand Down Expand Up @@ -175,6 +177,21 @@ func (observer *ClusterStateObserver) observe(
observed.configMap = observedConfigMap
}

// PodDisruptionBudget.
var observedPodDisruptionBudget = new(policyv1.PodDisruptionBudget)
err = observer.observePodDisruptionBudget(observedPodDisruptionBudget)
if err != nil {
if client.IgnoreNotFound(err) != nil {
log.Error(err, "Failed to get PodDisruptionBudget")
return err
}
log.Info("Observed PodDisruptionBudget", "state", "nil")
observedPodDisruptionBudget = nil
} else {
log.Info("Observed PodDisruptionBudget", "state", *observedPodDisruptionBudget)
observed.podDisruptionBudget = observedPodDisruptionBudget
}

// JobManager StatefulSet.
var observedJmStatefulSet = new(appsv1.StatefulSet)
err = observer.observeJobManagerStatefulSet(observedJmStatefulSet)
Expand Down Expand Up @@ -427,6 +444,20 @@ func (observer *ClusterStateObserver) observeRevisions(
return err
}

func (observer *ClusterStateObserver) observePodDisruptionBudget(
observedPodDisruptionBudget *policyv1.PodDisruptionBudget) error {
var clusterNamespace = observer.request.Namespace
var clusterName = observer.request.Name

return observer.k8sClient.Get(
observer.context,
types.NamespacedName{
Namespace: clusterNamespace,
Name: getPodDisruptionBudgetName(clusterName),
},
observedPodDisruptionBudget)
}

func (observer *ClusterStateObserver) observeConfigMap(
observedConfigMap *corev1.ConfigMap) error {
var clusterNamespace = observer.request.Namespace
Expand Down
54 changes: 54 additions & 0 deletions controllers/flinkcluster/flinkcluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -85,6 +86,11 @@ func (reconciler *ClusterReconciler) reconcile() (ctrl.Result, error) {
return ctrl.Result{}, err
}

err = reconciler.reconcilePodDisruptionBudget()
if err != nil {
return ctrl.Result{}, err
}

err = reconciler.reconcileJobManagerStatefulSet()
if err != nil {
return ctrl.Result{}, err
Expand Down Expand Up @@ -463,6 +469,54 @@ func (reconciler *ClusterReconciler) deleteConfigMap(
return err
}

func (reconciler *ClusterReconciler) reconcilePodDisruptionBudget() error {
var desiredPodDisruptionBudget = reconciler.desired.PodDisruptionBudget
var observedPodDisruptionBudget = reconciler.observed.podDisruptionBudget

if desiredPodDisruptionBudget != nil && observedPodDisruptionBudget == nil {
return reconciler.createPodDisruptionBudget(desiredPodDisruptionBudget, "PodDisruptionBudget")
}

if desiredPodDisruptionBudget == nil && observedPodDisruptionBudget != nil {
return reconciler.deletePodDisruptionBudget(observedPodDisruptionBudget, "PodDisruptionBudget")
}

return nil
}

func (reconciler *ClusterReconciler) createPodDisruptionBudget(
pdb *policyv1.PodDisruptionBudget, component string) error {
var context = reconciler.context
var log = reconciler.log.WithValues("component", component)
var k8sClient = reconciler.k8sClient

log.Info("Creating PodDisruptionBudget", "PodDisruptionBudget", *pdb)
var err = k8sClient.Create(context, pdb)
if err != nil {
log.Info("Failed to create PodDisruptionBudget", "error", err)
} else {
log.Info("PodDisruptionBudget created")
}
return err
}

func (reconciler *ClusterReconciler) deletePodDisruptionBudget(
pdb *policyv1.PodDisruptionBudget, component string) error {
var context = reconciler.context
var log = reconciler.log.WithValues("component", component)
var k8sClient = reconciler.k8sClient

log.Info("Deleting PodDisruptionBudget", "PodDisruptionBudget", pdb)
var err = k8sClient.Delete(context, pdb)
err = client.IgnoreNotFound(err)
if err != nil {
log.Error(err, "Failed to delete PodDisruptionBudget")
} else {
log.Info("PodDisruptionBudget deleted")
}
return err
}

func (reconciler *ClusterReconciler) reconcilePersistentVolumeClaims() error {
observed := reconciler.observed
pvcs := observed.persistentVolumeClaims
Expand Down
12 changes: 12 additions & 0 deletions controllers/flinkcluster/flinkcluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -96,6 +97,11 @@ func getConfigMapName(clusterName string) string {
return clusterName + "-configmap"
}

// Gets PodDisruptionBudgetName name
func getPodDisruptionBudgetName(clusterName string) string {
return "flink-" + clusterName
}

// Gets JobManager StatefulSet name
func getJobManagerStatefulSetName(clusterName string) string {
return clusterName + "-jobmanager"
Expand Down Expand Up @@ -338,6 +344,10 @@ func isComponentUpdated(component runtime.Object, cluster *v1beta1.FlinkCluster)
if o == nil {
return false
}
case *policyv1.PodDisruptionBudget:
if o == nil {
return false
}
case *corev1.Service:
if o == nil {
return false
Expand Down Expand Up @@ -373,6 +383,7 @@ func areComponentsUpdated(components []runtime.Object, cluster *v1beta1.FlinkClu
func isUpdatedAll(observed ObservedClusterState) bool {
components := []runtime.Object{
observed.configMap,
observed.podDisruptionBudget,
observed.jmStatefulSet,
observed.tmStatefulSet,
observed.jmService,
Expand All @@ -389,6 +400,7 @@ func isClusterUpdateToDate(observed *ObservedClusterState) bool {
}
components := []runtime.Object{
observed.configMap,
observed.podDisruptionBudget,
observed.jmStatefulSet,
observed.tmStatefulSet,
observed.jmService,
Expand Down
25 changes: 14 additions & 11 deletions controllers/flinkcluster/flinkcluster_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -263,11 +264,12 @@ func TestGetUpdateState(t *testing.T) {
Components: v1beta1.FlinkClusterComponentsStatus{Job: &v1beta1.JobStatus{State: v1beta1.JobStateRunning}},
Revision: v1beta1.RevisionStatus{CurrentRevision: "cluster-85dc8f749-2", NextRevision: "cluster-aa5e3a87z-3"}},
},
flinkJobSubmitter: FlinkJobSubmitter{job: &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}},
configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
flinkJobSubmitter: FlinkJobSubmitter{job: &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}},
configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
podDisruptionBudget: &policyv1.PodDisruptionBudget{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
}
var state = getUpdateState(&observed)
assert.Equal(t, state, UpdateStatePreparing)
Expand Down Expand Up @@ -297,12 +299,13 @@ func TestGetUpdateState(t *testing.T) {
},
Status: v1beta1.FlinkClusterStatus{Revision: v1beta1.RevisionStatus{CurrentRevision: "cluster-85dc8f749-2", NextRevision: "cluster-aa5e3a87z-3"}},
},
flinkJobSubmitter: FlinkJobSubmitter{job: &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}},
configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
jmIngress: &networkingv1.Ingress{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
flinkJobSubmitter: FlinkJobSubmitter{job: &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}},
configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
podDisruptionBudget: &policyv1.PodDisruptionBudget{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
jmIngress: &networkingv1.Ingress{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
}
state = getUpdateState(&observed)
assert.Equal(t, state, UpdateStateFinished)
Expand Down
14 changes: 8 additions & 6 deletions internal/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
)

// DesiredClusterState holds desired state of a cluster.
type DesiredClusterState struct {
JmStatefulSet *appsv1.StatefulSet
JmService *corev1.Service
JmIngress *networkingv1.Ingress
TmStatefulSet *appsv1.StatefulSet
ConfigMap *corev1.ConfigMap
Job *batchv1.Job
JmStatefulSet *appsv1.StatefulSet
JmService *corev1.Service
JmIngress *networkingv1.Ingress
TmStatefulSet *appsv1.StatefulSet
ConfigMap *corev1.ConfigMap
Job *batchv1.Job
PodDisruptionBudget *policyv1.PodDisruptionBudget
}

0 comments on commit 7052c67

Please sign in to comment.