Skip to content

Commit

Permalink
Add cluster status derive test (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored Jun 4, 2021
1 parent 0585363 commit 8ce7525
Showing 1 changed file with 165 additions and 60 deletions.
225 changes: 165 additions & 60 deletions controllers/flinkcluster_updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
v1beta1 "github.com/spotify/flink-on-k8s-operator/api/v1beta1"
"gotest.tools/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/log"
)

Expand All @@ -36,73 +38,176 @@ func TestGetStatefulSetStateNotReady(t *testing.T) {
t, state == v1beta1.ComponentStateNotReady)
}

func TestGetStatefulSetStateReady(t *testing.T) {
var replicas int32 = 3
var statefulSet = appsv1.StatefulSet{
Spec: appsv1.StatefulSetSpec{Replicas: &replicas},
Status: appsv1.StatefulSetStatus{ReadyReplicas: 3},
}
var state = getStatefulSetState(&statefulSet)
assert.Assert(t, state == v1beta1.ComponentStateReady)
}

func TestIsStatusChangedFalse(t *testing.T) {
var oldStatus = v1beta1.FlinkClusterStatus{}
var newStatus = v1beta1.FlinkClusterStatus{}
var updater = &ClusterStatusUpdater{}
assert.Assert(t, updater.isStatusChanged(oldStatus, newStatus) == false)
}
func TestClusterStatus(t *testing.T) {
t.Run("not changed", func(t *testing.T) {
var oldStatus = v1beta1.FlinkClusterStatus{}
var newStatus = v1beta1.FlinkClusterStatus{}
var updater = &ClusterStatusUpdater{}
assert.Assert(t, updater.isStatusChanged(oldStatus, newStatus) == false)
})

func TestIsStatusChangedTrue(t *testing.T) {
var oldStatus = v1beta1.FlinkClusterStatus{
Components: v1beta1.FlinkClusterComponentsStatus{
JobManagerStatefulSet: v1beta1.FlinkClusterComponentState{
Name: "my-jobmanager",
State: "NotReady",
},
TaskManagerStatefulSet: v1beta1.FlinkClusterComponentState{
Name: "my-taskmanager",
State: "NotReady",
t.Run("changed", func(t *testing.T) {
var oldStatus = v1beta1.FlinkClusterStatus{
Components: v1beta1.FlinkClusterComponentsStatus{
JobManagerStatefulSet: v1beta1.FlinkClusterComponentState{
Name: "my-jobmanager",
State: "NotReady",
},
TaskManagerStatefulSet: v1beta1.FlinkClusterComponentState{
Name: "my-taskmanager",
State: "NotReady",
},
JobManagerService: v1beta1.JobManagerServiceStatus{
Name: "my-jobmanager",
State: "NotReady",
},
JobManagerIngress: &v1beta1.JobManagerIngressStatus{
Name: "my-jobmanager",
State: "NotReady",
},
Job: &v1beta1.JobStatus{
Name: "my-job",
State: "Pending",
},
},
JobManagerService: v1beta1.JobManagerServiceStatus{
Name: "my-jobmanager",
State: "NotReady",
State: "Creating"}
var newStatus = v1beta1.FlinkClusterStatus{
Components: v1beta1.FlinkClusterComponentsStatus{
JobManagerStatefulSet: v1beta1.FlinkClusterComponentState{
Name: "my-jobmanager",
State: "Ready",
},
TaskManagerStatefulSet: v1beta1.FlinkClusterComponentState{
Name: "my-taskmanager",
State: "Ready",
},
JobManagerService: v1beta1.JobManagerServiceStatus{
Name: "my-jobmanager",
State: "Ready",
},
JobManagerIngress: &v1beta1.JobManagerIngressStatus{
Name: "my-jobmanager",
State: "Ready",
URLs: []string{"http://my-jobmanager"},
},
Job: &v1beta1.JobStatus{
Name: "my-job",
State: "Running",
},
},
JobManagerIngress: &v1beta1.JobManagerIngressStatus{
Name: "my-jobmanager",
State: "NotReady",
},
Job: &v1beta1.JobStatus{
Name: "my-job",
State: "Pending",
State: "Creating"}
var updater = &ClusterStatusUpdater{log: log.Log}
assert.Assert(t, updater.isStatusChanged(oldStatus, newStatus))
})

t.Run("derive status", func(t *testing.T) {
currentRevision := "1"
nextRevision := "1-2"
var oldStatus = v1beta1.FlinkClusterStatus{
Components: v1beta1.FlinkClusterComponentsStatus{
JobManagerStatefulSet: v1beta1.FlinkClusterComponentState{
Name: "my-jobmanager",
State: v1beta1.ComponentStateReady,
},
TaskManagerStatefulSet: v1beta1.FlinkClusterComponentState{
Name: "my-taskmanager",
State: v1beta1.ComponentStateReady,
},
JobManagerService: v1beta1.JobManagerServiceStatus{
Name: "my-jobmanager",
State: v1beta1.ComponentStateReady,
},
JobManagerIngress: &v1beta1.JobManagerIngressStatus{
Name: "my-jobmanager",
State: v1beta1.ComponentStateNotReady,
},
Job: &v1beta1.JobStatus{
Name: "my-job",
State: v1beta1.JobStatePending,
},
},
},
State: "Creating"}
var newStatus = v1beta1.FlinkClusterStatus{
Components: v1beta1.FlinkClusterComponentsStatus{
JobManagerStatefulSet: v1beta1.FlinkClusterComponentState{
Name: "my-jobmanager",
State: "Ready",
State: v1beta1.ClusterStateCreating,
CurrentRevision: currentRevision,
NextRevision: nextRevision,
}

restart := v1beta1.JobRestartPolicyNever
replicas := int32(1)
var observed = ObservedClusterState{
jmStatefulSet: &appsv1.StatefulSet{
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
},
Status: appsv1.StatefulSetStatus{
ReadyReplicas: 1,
},
},
TaskManagerStatefulSet: v1beta1.FlinkClusterComponentState{
Name: "my-taskmanager",
State: "Ready",
jmService: &corev1.Service{
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
ClusterIP: "127.0.0.1",
},
},
JobManagerService: v1beta1.JobManagerServiceStatus{
Name: "my-jobmanager",
State: "Ready",
tmStatefulSet: &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "tm-service",
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
},
Status: appsv1.StatefulSetStatus{
ReadyReplicas: 1,
},
},
JobManagerIngress: &v1beta1.JobManagerIngressStatus{
Name: "my-jobmanager",
State: "Ready",
URLs: []string{"http://my-jobmanager"},
revisionStatus: &RevisionStatus{
currentRevision: &appsv1.ControllerRevision{
Revision: 1,
},
nextRevision: &appsv1.ControllerRevision{
Revision: 2,
},
},
Job: &v1beta1.JobStatus{
Name: "my-job",
State: "Running",
cluster: &v1beta1.FlinkCluster{
Spec: v1beta1.FlinkClusterSpec{
Job: &v1beta1.JobSpec{
RestartPolicy: &restart,
},
},
Status: v1beta1.FlinkClusterStatus{
Components: v1beta1.FlinkClusterComponentsStatus{
ConfigMap: v1beta1.FlinkClusterComponentState{
Name: "my-configmap",
State: v1beta1.ComponentStateReady,
},
JobManagerStatefulSet: v1beta1.FlinkClusterComponentState{
Name: "my-jobmanager",
State: v1beta1.ComponentStateReady,
},
JobManagerService: v1beta1.JobManagerServiceStatus{
Name: "my-jobmanager",
State: v1beta1.ComponentStateReady,
},
JobManagerIngress: &v1beta1.JobManagerIngressStatus{
State: "NotReady",
},
TaskManagerStatefulSet: v1beta1.FlinkClusterComponentState{
Name: "my-taskamanger",
State: v1beta1.ComponentStateReady,
},
Job: &v1beta1.JobStatus{
State: v1beta1.JobStateSucceeded,
},
},
CurrentRevision: currentRevision,
NextRevision: nextRevision,
},
},
},
State: "Creating"}
var updater = &ClusterStatusUpdater{log: log.Log}
assert.Assert(t, updater.isStatusChanged(oldStatus, newStatus))
}

var updater = &ClusterStatusUpdater{log: log.Log, observed: observed}
newStatus := updater.deriveClusterStatus(&oldStatus, &observed)
assert.Assert(t, updater.isStatusChanged(oldStatus, newStatus))
assert.Equal(t, newStatus.State, v1beta1.ClusterStateRunning)
})

}

0 comments on commit 8ce7525

Please sign in to comment.