diff --git a/api/v1beta1/flinkcluster_validate.go b/api/v1beta1/flinkcluster_validate.go index 867338da..6c95904e 100644 --- a/api/v1beta1/flinkcluster_validate.go +++ b/api/v1beta1/flinkcluster_validate.go @@ -122,7 +122,7 @@ func (v *Validator) ValidateUpdate(old *FlinkCluster, new *FlinkCluster) error { } func (v *Validator) checkControlAnnotations(old *FlinkCluster, new *FlinkCluster) error { - oldUserControl, _ := old.Annotations[ControlAnnotation] + oldUserControl := old.Annotations[ControlAnnotation] newUserControl, ok := new.Annotations[ControlAnnotation] if ok { if oldUserControl != newUserControl && old.Status.Control != nil && old.Status.Control.State == ControlStateProgressing { @@ -248,10 +248,10 @@ func (v *Validator) validateHadoopConfig(hadoopConfig *HadoopConfig) error { return nil } if len(hadoopConfig.ConfigMapName) == 0 { - return fmt.Errorf("Hadoop ConfigMap name is unspecified") + return fmt.Errorf("hadoop ConfigMap name is unspecified") } if len(hadoopConfig.MountPath) == 0 { - return fmt.Errorf("Hadoop config volume mount path is unspecified") + return fmt.Errorf("hadoop config volume mount path is unspecified") } return nil } diff --git a/api/v1beta1/flinkcluster_validate_test.go b/api/v1beta1/flinkcluster_validate_test.go index ea7871cd..d4840317 100644 --- a/api/v1beta1/flinkcluster_validate_test.go +++ b/api/v1beta1/flinkcluster_validate_test.go @@ -750,7 +750,7 @@ func TestInvalidHadoopConfig(t *testing.T) { MountPath: "/etc/hadoop/conf", } var err1 = validator.validateHadoopConfig(&hadoopConfig1) - var expectedErr1 = "Hadoop ConfigMap name is unspecified" + var expectedErr1 = "hadoop ConfigMap name is unspecified" assert.Assert(t, err1 != nil, "err is not expected to be nil") assert.Equal(t, err1.Error(), expectedErr1) @@ -759,7 +759,7 @@ func TestInvalidHadoopConfig(t *testing.T) { MountPath: "", } var err2 = validator.validateHadoopConfig(&hadoopConfig2) - var expectedErr2 = "Hadoop config volume mount path is unspecified" + var expectedErr2 = "hadoop config volume mount path is unspecified" assert.Assert(t, err2 != nil, "err is not expected to be nil") assert.Equal(t, err2.Error(), expectedErr2) } diff --git a/controllers/flinkcluster_converter.go b/controllers/flinkcluster_converter.go index 406d5cf3..f65f0f98 100644 --- a/controllers/flinkcluster_converter.go +++ b/controllers/flinkcluster_converter.go @@ -187,7 +187,7 @@ func getDesiredJobManagerStatefulSet( } envVars = append(envVars, flinkCluster.Spec.EnvVars...) - var containers = []corev1.Container{corev1.Container{ + var containers = []corev1.Container{{ Name: "jobmanager", Image: imageSpec.Name, ImagePullPolicy: imageSpec.PullPolicy, @@ -334,7 +334,7 @@ func getDesiredJobManagerIngress( if jobManagerIngressSpec.HostFormat != nil { ingressHost = getJobManagerIngressHost(*jobManagerIngressSpec.HostFormat, clusterName) } - if jobManagerIngressSpec.UseTLS != nil && *jobManagerIngressSpec.UseTLS == true { + if jobManagerIngressSpec.UseTLS != nil && *jobManagerIngressSpec.UseTLS { var secretName string var hosts []string if ingressHost != "" { @@ -488,7 +488,7 @@ func getDesiredTaskManagerStatefulSet( } envVars = append(envVars, flinkCluster.Spec.EnvVars...) - var containers = []corev1.Container{corev1.Container{ + var containers = []corev1.Container{{ Name: "taskmanager", Image: imageSpec.Name, ImagePullPolicy: imageSpec.PullPolicy, @@ -657,7 +657,7 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job { } if jobSpec.AllowNonRestoredState != nil && - *jobSpec.AllowNonRestoredState == true { + *jobSpec.AllowNonRestoredState { jobArgs = append(jobArgs, "--allowNonRestoredState") } @@ -666,7 +666,7 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job { } if jobSpec.NoLoggingToStdout != nil && - *jobSpec.NoLoggingToStdout == true { + *jobSpec.NoLoggingToStdout { jobArgs = append(jobArgs, "--sysoutLogging") } jobArgs = append(jobArgs, "--detached") @@ -737,7 +737,7 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job { var podSpec = corev1.PodSpec{ InitContainers: convertJobInitContainers(jobSpec, saMount, saEnv), Containers: []corev1.Container{ - corev1.Container{ + { Name: "main", Image: imageSpec.Name, ImagePullPolicy: imageSpec.PullPolicy, diff --git a/controllers/flinkcluster_converter_test.go b/controllers/flinkcluster_converter_test.go index 7ba9a165..7f673f45 100644 --- a/controllers/flinkcluster_converter_test.go +++ b/controllers/flinkcluster_converter_test.go @@ -27,7 +27,6 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" extensionsv1beta1 "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -247,16 +246,16 @@ func TestGetDesiredClusterState(t *testing.T) { VolumeMounts: []corev1.VolumeMount{ {Name: "cache-volume", MountPath: "/cache"}, }, - VolumeClaimTemplates: []v1.PersistentVolumeClaim{ + VolumeClaimTemplates: []corev1.PersistentVolumeClaim{ { ObjectMeta: metav1.ObjectMeta{ Name: "pvc-test", }, - Spec: v1.PersistentVolumeClaimSpec{ - AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, - Resources: v1.ResourceRequirements{ - Requests: map[v1.ResourceName]resource.Quantity{ - v1.ResourceStorage: resource.MustParse("100Gi"), + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}, + Resources: corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceStorage: resource.MustParse("100Gi"), }, }, StorageClassName: &storageClassName, @@ -499,14 +498,14 @@ func TestGetDesiredClusterState(t *testing.T) { }, }, }, - Spec: v1.ServiceSpec{ + Spec: corev1.ServiceSpec{ Type: "LoadBalancer", Selector: map[string]string{ "app": "flink", "cluster": "flinkjobcluster-sample", "component": "jobmanager", }, - Ports: []v1.ServicePort{ + Ports: []corev1.ServicePort{ {Name: "rpc", Port: 6123, TargetPort: intstr.FromString("rpc")}, {Name: "blob", Port: 6124, TargetPort: intstr.FromString("blob")}, {Name: "query", Port: 6125, TargetPort: intstr.FromString("query")}, @@ -604,16 +603,16 @@ func TestGetDesiredClusterState(t *testing.T) { "component": "taskmanager", }, }, - VolumeClaimTemplates: []v1.PersistentVolumeClaim{ + VolumeClaimTemplates: []corev1.PersistentVolumeClaim{ { ObjectMeta: metav1.ObjectMeta{ Name: "pvc-test", }, - Spec: v1.PersistentVolumeClaimSpec{ - AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, - Resources: v1.ResourceRequirements{ - Requests: map[v1.ResourceName]resource.Quantity{ - v1.ResourceStorage: resource.MustParse("100Gi"), + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}, + Resources: corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceStorage: resource.MustParse("100Gi"), }, }, StorageClassName: &storageClassName, @@ -634,7 +633,7 @@ func TestGetDesiredClusterState(t *testing.T) { Spec: corev1.PodSpec{ InitContainers: make([]corev1.Container, 0), Containers: []corev1.Container{ - corev1.Container{ + { Name: "taskmanager", Image: "flink:1.8.1", Args: []string{"taskmanager"}, @@ -695,7 +694,7 @@ func TestGetDesiredClusterState(t *testing.T) { corev1.ResourceMemory: resource.MustParse("1Gi"), }, }, - VolumeMounts: []v1.VolumeMount{ + VolumeMounts: []corev1.VolumeMount{ {Name: "cache-volume", MountPath: "/cache"}, {Name: "flink-config-volume", MountPath: "/opt/flink/conf"}, { @@ -710,7 +709,7 @@ func TestGetDesiredClusterState(t *testing.T) { }, }, }, - corev1.Container{Name: "sidecar", Image: "alpine"}, + {Name: "sidecar", Image: "alpine"}, }, Volumes: []corev1.Volume{ { @@ -786,7 +785,7 @@ func TestGetDesiredClusterState(t *testing.T) { }, }, Spec: batchv1.JobSpec{ - Template: v1.PodTemplateSpec{ + Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "app": "flink", @@ -796,8 +795,8 @@ func TestGetDesiredClusterState(t *testing.T) { "example.com": "example", }, }, - Spec: v1.PodSpec{ - InitContainers: []v1.Container{ + Spec: corev1.PodSpec{ + InitContainers: []corev1.Container{ { Name: "gcs-downloader", Image: "google/cloud-sdk", @@ -805,7 +804,7 @@ func TestGetDesiredClusterState(t *testing.T) { Args: []string{ "cp", "gs://my-bucket/my-job.jar", "/cache/my-job.jar", }, - VolumeMounts: []v1.VolumeMount{ + VolumeMounts: []corev1.VolumeMount{ {Name: "cache-volume", MountPath: "/cache"}, { Name: "gcp-service-account-volume", @@ -821,7 +820,7 @@ func TestGetDesiredClusterState(t *testing.T) { }, }, }, - Containers: []v1.Container{ + Containers: []corev1.Container{ { Name: "main", Image: "flink:1.8.1", @@ -839,7 +838,7 @@ func TestGetDesiredClusterState(t *testing.T) { "--input", "./README.txt", }, - Env: []v1.EnvVar{ + Env: []corev1.EnvVar{ {Name: "FLINK_JM_ADDR", Value: "flinkjobcluster-sample-jobmanager:8081"}, {Name: "HADOOP_CONF_DIR", Value: "/etc/hadoop/conf"}, { @@ -867,7 +866,7 @@ func TestGetDesiredClusterState(t *testing.T) { corev1.ResourceMemory: resource.MustParse("512Mi"), }, }, - VolumeMounts: []v1.VolumeMount{ + VolumeMounts: []corev1.VolumeMount{ {Name: "cache-volume", MountPath: "/cache"}, { Name: "flink-config-volume", diff --git a/controllers/flinkcluster_observer.go b/controllers/flinkcluster_observer.go index 8c48fcb3..bb4f2fc3 100644 --- a/controllers/flinkcluster_observer.go +++ b/controllers/flinkcluster_observer.go @@ -250,10 +250,8 @@ func (observer *ClusterStateObserver) observeJob( var jobInPendingState = recordedJobStatus != nil && recordedJobStatus.State == v1beta1.JobStatePending var flinkJobID string if jobSubmitCompleted && jobInPendingState { - var observedJobPod *corev1.Pod - // Get job submitter pod resource. - observedJobPod = new(corev1.Pod) + var observedJobPod *corev1.Pod = new(corev1.Pod) err = observer.observeJobPod(observedJobPod) if err != nil { log.Error(err, "Failed to get job pod") @@ -350,7 +348,6 @@ func (observer *ClusterStateObserver) observeFlinkJobStatus( log.Info("Observed Flink job", "flink job", *flinkJob) } - return } func (observer *ClusterStateObserver) observeSavepoint(observed *ObservedClusterState) error { diff --git a/controllers/flinkcluster_util.go b/controllers/flinkcluster_util.go index 885c5b83..c5f3c223 100644 --- a/controllers/flinkcluster_util.go +++ b/controllers/flinkcluster_util.go @@ -360,7 +360,7 @@ func getSavepointEvent(status v1beta1.SavepointStatus) (eventType string, eventR case v1beta1.SavepointStateSucceeded: eventType = corev1.EventTypeNormal eventReason = "SavepointCreated" - eventMessage = fmt.Sprintf("Successfully savepoint created") + eventMessage = "Successfully savepoint created" case v1beta1.SavepointStateFailed: eventType = corev1.EventTypeWarning eventReason = "SavepointFailed" @@ -418,10 +418,7 @@ func hasTimeElapsed(timeToCheckStr string, now time.Time, intervalSec int) bool tc := &TimeConverter{} timeToCheck := tc.FromString(timeToCheckStr) intervalPassedTime := timeToCheck.Add(time.Duration(int64(intervalSec) * int64(time.Second))) - if now.After(intervalPassedTime) { - return true - } - return false + return now.After(intervalPassedTime) } // isComponentUpdated checks whether the component updated. @@ -451,17 +448,11 @@ func isComponentUpdated(component runtime.Object, cluster v1beta1.FlinkCluster) } case *batchv1.Job: if o == nil { - if cluster.Spec.Job != nil { - return false - } - return true + return cluster.Spec.Job == nil } case *extensionsv1beta1.Ingress: if o == nil { - if cluster.Spec.JobManager.Ingress != nil { - return false - } - return true + return cluster.Spec.JobManager.Ingress == nil } } diff --git a/controllers/flinkcluster_util_test.go b/controllers/flinkcluster_util_test.go index df27a422..46334233 100644 --- a/controllers/flinkcluster_util_test.go +++ b/controllers/flinkcluster_util_test.go @@ -279,7 +279,6 @@ func TestShouldUpdateJob(t *testing.T) { assert.Equal(t, update, false) // cannot update without savepointLocation - tc = &TimeConverter{} savepointTime = time.Now() observeTime = savepointTime.Add(time.Second * 500) observed = ObservedClusterState{ @@ -581,10 +580,9 @@ func TestGetFlinkJobDeploymentState(t *testing.T) { var pod corev1.Pod var submit, expected *FlinkJobSubmitLog var err error - var termMsg string // success - termMsg = ` + termMsg := ` jobID: ec74209eb4e3db8ae72db00bd7a830aa message: | Successfully submitted! @@ -613,7 +611,7 @@ Job has been submitted with JobID ec74209eb4e3db8ae72db00bd7a830aa assert.DeepEqual(t, *submit, *expected) // failed: pod not found - submit, err = getFlinkJobSubmitLog(nil) + _, err = getFlinkJobSubmitLog(nil) assert.Error(t, err, "no job pod found, even though submission completed") // failed: message not found @@ -624,6 +622,6 @@ Job has been submitted with JobID ec74209eb4e3db8ae72db00bd7a830aa Terminated: &corev1.ContainerStateTerminated{ Message: "", }}}}}} - submit, err = getFlinkJobSubmitLog(&pod) + _, err = getFlinkJobSubmitLog(&pod) assert.Error(t, err, "job pod found, but no termination log found even though submission completed") } diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 7c2bec95..656811cc 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -25,7 +25,6 @@ import ( v1beta1 "github.com/spotify/flink-on-k8s-operator/api/v1beta1" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/controller-runtime/pkg/envtest/printer" @@ -37,7 +36,6 @@ import ( // These tests use Ginkgo (BDD-style Go testing framework). Refer to // http://onsi.github.io/ginkgo/ to learn more about Ginkgo. -var cfg *rest.Config var k8sClient client.Client var testEnv *envtest.Environment