Skip to content

Commit

Permalink
Cleanup and remove unused (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored May 12, 2021
1 parent fc4f4da commit de5c431
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 60 deletions.
6 changes: 3 additions & 3 deletions api/v1beta1/flinkcluster_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (v *Validator) ValidateUpdate(old *FlinkCluster, new *FlinkCluster) error {
}

func (v *Validator) checkControlAnnotations(old *FlinkCluster, new *FlinkCluster) error {
oldUserControl, _ := old.Annotations[ControlAnnotation]
oldUserControl := old.Annotations[ControlAnnotation]
newUserControl, ok := new.Annotations[ControlAnnotation]
if ok {
if oldUserControl != newUserControl && old.Status.Control != nil && old.Status.Control.State == ControlStateProgressing {
Expand Down Expand Up @@ -248,10 +248,10 @@ func (v *Validator) validateHadoopConfig(hadoopConfig *HadoopConfig) error {
return nil
}
if len(hadoopConfig.ConfigMapName) == 0 {
return fmt.Errorf("Hadoop ConfigMap name is unspecified")
return fmt.Errorf("hadoop ConfigMap name is unspecified")
}
if len(hadoopConfig.MountPath) == 0 {
return fmt.Errorf("Hadoop config volume mount path is unspecified")
return fmt.Errorf("hadoop config volume mount path is unspecified")
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions api/v1beta1/flinkcluster_validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ func TestInvalidHadoopConfig(t *testing.T) {
MountPath: "/etc/hadoop/conf",
}
var err1 = validator.validateHadoopConfig(&hadoopConfig1)
var expectedErr1 = "Hadoop ConfigMap name is unspecified"
var expectedErr1 = "hadoop ConfigMap name is unspecified"
assert.Assert(t, err1 != nil, "err is not expected to be nil")
assert.Equal(t, err1.Error(), expectedErr1)

Expand All @@ -759,7 +759,7 @@ func TestInvalidHadoopConfig(t *testing.T) {
MountPath: "",
}
var err2 = validator.validateHadoopConfig(&hadoopConfig2)
var expectedErr2 = "Hadoop config volume mount path is unspecified"
var expectedErr2 = "hadoop config volume mount path is unspecified"
assert.Assert(t, err2 != nil, "err is not expected to be nil")
assert.Equal(t, err2.Error(), expectedErr2)
}
Expand Down
12 changes: 6 additions & 6 deletions controllers/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func getDesiredJobManagerStatefulSet(
}

envVars = append(envVars, flinkCluster.Spec.EnvVars...)
var containers = []corev1.Container{corev1.Container{
var containers = []corev1.Container{{
Name: "jobmanager",
Image: imageSpec.Name,
ImagePullPolicy: imageSpec.PullPolicy,
Expand Down Expand Up @@ -334,7 +334,7 @@ func getDesiredJobManagerIngress(
if jobManagerIngressSpec.HostFormat != nil {
ingressHost = getJobManagerIngressHost(*jobManagerIngressSpec.HostFormat, clusterName)
}
if jobManagerIngressSpec.UseTLS != nil && *jobManagerIngressSpec.UseTLS == true {
if jobManagerIngressSpec.UseTLS != nil && *jobManagerIngressSpec.UseTLS {
var secretName string
var hosts []string
if ingressHost != "" {
Expand Down Expand Up @@ -488,7 +488,7 @@ func getDesiredTaskManagerStatefulSet(
}
envVars = append(envVars, flinkCluster.Spec.EnvVars...)

var containers = []corev1.Container{corev1.Container{
var containers = []corev1.Container{{
Name: "taskmanager",
Image: imageSpec.Name,
ImagePullPolicy: imageSpec.PullPolicy,
Expand Down Expand Up @@ -657,7 +657,7 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job {
}

if jobSpec.AllowNonRestoredState != nil &&
*jobSpec.AllowNonRestoredState == true {
*jobSpec.AllowNonRestoredState {
jobArgs = append(jobArgs, "--allowNonRestoredState")
}

Expand All @@ -666,7 +666,7 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job {
}

if jobSpec.NoLoggingToStdout != nil &&
*jobSpec.NoLoggingToStdout == true {
*jobSpec.NoLoggingToStdout {
jobArgs = append(jobArgs, "--sysoutLogging")
}
jobArgs = append(jobArgs, "--detached")
Expand Down Expand Up @@ -737,7 +737,7 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job {
var podSpec = corev1.PodSpec{
InitContainers: convertJobInitContainers(jobSpec, saMount, saEnv),
Containers: []corev1.Container{
corev1.Container{
{
Name: "main",
Image: imageSpec.Name,
ImagePullPolicy: imageSpec.PullPolicy,
Expand Down
49 changes: 24 additions & 25 deletions controllers/flinkcluster_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -247,16 +246,16 @@ func TestGetDesiredClusterState(t *testing.T) {
VolumeMounts: []corev1.VolumeMount{
{Name: "cache-volume", MountPath: "/cache"},
},
VolumeClaimTemplates: []v1.PersistentVolumeClaim{
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc-test",
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
Resources: v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceStorage: resource.MustParse("100Gi"),
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.ResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceStorage: resource.MustParse("100Gi"),
},
},
StorageClassName: &storageClassName,
Expand Down Expand Up @@ -499,14 +498,14 @@ func TestGetDesiredClusterState(t *testing.T) {
},
},
},
Spec: v1.ServiceSpec{
Spec: corev1.ServiceSpec{
Type: "LoadBalancer",
Selector: map[string]string{
"app": "flink",
"cluster": "flinkjobcluster-sample",
"component": "jobmanager",
},
Ports: []v1.ServicePort{
Ports: []corev1.ServicePort{
{Name: "rpc", Port: 6123, TargetPort: intstr.FromString("rpc")},
{Name: "blob", Port: 6124, TargetPort: intstr.FromString("blob")},
{Name: "query", Port: 6125, TargetPort: intstr.FromString("query")},
Expand Down Expand Up @@ -604,16 +603,16 @@ func TestGetDesiredClusterState(t *testing.T) {
"component": "taskmanager",
},
},
VolumeClaimTemplates: []v1.PersistentVolumeClaim{
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc-test",
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
Resources: v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceStorage: resource.MustParse("100Gi"),
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.ResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceStorage: resource.MustParse("100Gi"),
},
},
StorageClassName: &storageClassName,
Expand All @@ -634,7 +633,7 @@ func TestGetDesiredClusterState(t *testing.T) {
Spec: corev1.PodSpec{
InitContainers: make([]corev1.Container, 0),
Containers: []corev1.Container{
corev1.Container{
{
Name: "taskmanager",
Image: "flink:1.8.1",
Args: []string{"taskmanager"},
Expand Down Expand Up @@ -695,7 +694,7 @@ func TestGetDesiredClusterState(t *testing.T) {
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
},
VolumeMounts: []v1.VolumeMount{
VolumeMounts: []corev1.VolumeMount{
{Name: "cache-volume", MountPath: "/cache"},
{Name: "flink-config-volume", MountPath: "/opt/flink/conf"},
{
Expand All @@ -710,7 +709,7 @@ func TestGetDesiredClusterState(t *testing.T) {
},
},
},
corev1.Container{Name: "sidecar", Image: "alpine"},
{Name: "sidecar", Image: "alpine"},
},
Volumes: []corev1.Volume{
{
Expand Down Expand Up @@ -786,7 +785,7 @@ func TestGetDesiredClusterState(t *testing.T) {
},
},
Spec: batchv1.JobSpec{
Template: v1.PodTemplateSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "flink",
Expand All @@ -796,16 +795,16 @@ func TestGetDesiredClusterState(t *testing.T) {
"example.com": "example",
},
},
Spec: v1.PodSpec{
InitContainers: []v1.Container{
Spec: corev1.PodSpec{
InitContainers: []corev1.Container{
{
Name: "gcs-downloader",
Image: "google/cloud-sdk",
Command: []string{"gsutil"},
Args: []string{
"cp", "gs://my-bucket/my-job.jar", "/cache/my-job.jar",
},
VolumeMounts: []v1.VolumeMount{
VolumeMounts: []corev1.VolumeMount{
{Name: "cache-volume", MountPath: "/cache"},
{
Name: "gcp-service-account-volume",
Expand All @@ -821,7 +820,7 @@ func TestGetDesiredClusterState(t *testing.T) {
},
},
},
Containers: []v1.Container{
Containers: []corev1.Container{
{
Name: "main",
Image: "flink:1.8.1",
Expand All @@ -839,7 +838,7 @@ func TestGetDesiredClusterState(t *testing.T) {
"--input",
"./README.txt",
},
Env: []v1.EnvVar{
Env: []corev1.EnvVar{
{Name: "FLINK_JM_ADDR", Value: "flinkjobcluster-sample-jobmanager:8081"},
{Name: "HADOOP_CONF_DIR", Value: "/etc/hadoop/conf"},
{
Expand Down Expand Up @@ -867,7 +866,7 @@ func TestGetDesiredClusterState(t *testing.T) {
corev1.ResourceMemory: resource.MustParse("512Mi"),
},
},
VolumeMounts: []v1.VolumeMount{
VolumeMounts: []corev1.VolumeMount{
{Name: "cache-volume", MountPath: "/cache"},
{
Name: "flink-config-volume",
Expand Down
5 changes: 1 addition & 4 deletions controllers/flinkcluster_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,8 @@ func (observer *ClusterStateObserver) observeJob(
var jobInPendingState = recordedJobStatus != nil && recordedJobStatus.State == v1beta1.JobStatePending
var flinkJobID string
if jobSubmitCompleted && jobInPendingState {
var observedJobPod *corev1.Pod

// Get job submitter pod resource.
observedJobPod = new(corev1.Pod)
var observedJobPod *corev1.Pod = new(corev1.Pod)
err = observer.observeJobPod(observedJobPod)
if err != nil {
log.Error(err, "Failed to get job pod")
Expand Down Expand Up @@ -350,7 +348,6 @@ func (observer *ClusterStateObserver) observeFlinkJobStatus(
log.Info("Observed Flink job", "flink job", *flinkJob)
}

return
}

func (observer *ClusterStateObserver) observeSavepoint(observed *ObservedClusterState) error {
Expand Down
17 changes: 4 additions & 13 deletions controllers/flinkcluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func getSavepointEvent(status v1beta1.SavepointStatus) (eventType string, eventR
case v1beta1.SavepointStateSucceeded:
eventType = corev1.EventTypeNormal
eventReason = "SavepointCreated"
eventMessage = fmt.Sprintf("Successfully savepoint created")
eventMessage = "Successfully savepoint created"
case v1beta1.SavepointStateFailed:
eventType = corev1.EventTypeWarning
eventReason = "SavepointFailed"
Expand Down Expand Up @@ -418,10 +418,7 @@ func hasTimeElapsed(timeToCheckStr string, now time.Time, intervalSec int) bool
tc := &TimeConverter{}
timeToCheck := tc.FromString(timeToCheckStr)
intervalPassedTime := timeToCheck.Add(time.Duration(int64(intervalSec) * int64(time.Second)))
if now.After(intervalPassedTime) {
return true
}
return false
return now.After(intervalPassedTime)
}

// isComponentUpdated checks whether the component updated.
Expand Down Expand Up @@ -451,17 +448,11 @@ func isComponentUpdated(component runtime.Object, cluster v1beta1.FlinkCluster)
}
case *batchv1.Job:
if o == nil {
if cluster.Spec.Job != nil {
return false
}
return true
return cluster.Spec.Job == nil
}
case *extensionsv1beta1.Ingress:
if o == nil {
if cluster.Spec.JobManager.Ingress != nil {
return false
}
return true
return cluster.Spec.JobManager.Ingress == nil
}
}

Expand Down
8 changes: 3 additions & 5 deletions controllers/flinkcluster_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ func TestShouldUpdateJob(t *testing.T) {
assert.Equal(t, update, false)

// cannot update without savepointLocation
tc = &TimeConverter{}
savepointTime = time.Now()
observeTime = savepointTime.Add(time.Second * 500)
observed = ObservedClusterState{
Expand Down Expand Up @@ -581,10 +580,9 @@ func TestGetFlinkJobDeploymentState(t *testing.T) {
var pod corev1.Pod
var submit, expected *FlinkJobSubmitLog
var err error
var termMsg string

// success
termMsg = `
termMsg := `
jobID: ec74209eb4e3db8ae72db00bd7a830aa
message: |
Successfully submitted!
Expand Down Expand Up @@ -613,7 +611,7 @@ Job has been submitted with JobID ec74209eb4e3db8ae72db00bd7a830aa
assert.DeepEqual(t, *submit, *expected)

// failed: pod not found
submit, err = getFlinkJobSubmitLog(nil)
_, err = getFlinkJobSubmitLog(nil)
assert.Error(t, err, "no job pod found, even though submission completed")

// failed: message not found
Expand All @@ -624,6 +622,6 @@ Job has been submitted with JobID ec74209eb4e3db8ae72db00bd7a830aa
Terminated: &corev1.ContainerStateTerminated{
Message: "",
}}}}}}
submit, err = getFlinkJobSubmitLog(&pod)
_, err = getFlinkJobSubmitLog(&pod)
assert.Error(t, err, "job pod found, but no termination log found even though submission completed")
}
2 changes: 0 additions & 2 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

v1beta1 "github.com/spotify/flink-on-k8s-operator/api/v1beta1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
Expand All @@ -37,7 +36,6 @@ import (
// These tests use Ginkgo (BDD-style Go testing framework). Refer to
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.

var cfg *rest.Config
var k8sClient client.Client
var testEnv *envtest.Environment

Expand Down

0 comments on commit de5c431

Please sign in to comment.