From fc4f4dacf6bb9cf54c81a72195a30525f7255d61 Mon Sep 17 00:00:00 2001 From: Julien Tournay Date: Wed, 12 May 2021 10:41:09 +0200 Subject: [PATCH] Propate GSA to init containers (#32) --- controllers/flinkcluster_converter.go | 35 ++++++++++++++++------ controllers/flinkcluster_converter_test.go | 11 +++++++ 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/controllers/flinkcluster_converter.go b/controllers/flinkcluster_converter.go index 238aa74c..406d5cf3 100644 --- a/controllers/flinkcluster_converter.go +++ b/controllers/flinkcluster_converter.go @@ -204,7 +204,7 @@ func getDesiredJobManagerStatefulSet( containers = append(containers, jobManagerSpec.Sidecars...) var podSpec = corev1.PodSpec{ - InitContainers: convertJobManagerInitContainers(&jobManagerSpec), + InitContainers: convertJobManagerInitContainers(&jobManagerSpec, saMount, saEnv), Containers: containers, Volumes: volumes, NodeSelector: jobManagerSpec.NodeSelector, @@ -503,7 +503,7 @@ func getDesiredTaskManagerStatefulSet( }} containers = append(containers, taskManagerSpec.Sidecars...) var podSpec = corev1.PodSpec{ - InitContainers: convertTaskManagerInitContainers(&taskManagerSpec), + InitContainers: convertTaskManagerInitContainers(&taskManagerSpec, saMount, saEnv), Containers: containers, Volumes: volumes, NodeSelector: taskManagerSpec.NodeSelector, @@ -735,7 +735,7 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job { envVars = append(envVars, flinkCluster.Spec.EnvVars...) var podSpec = corev1.PodSpec{ - InitContainers: convertJobInitContainers(jobSpec), + InitContainers: convertJobInitContainers(jobSpec, saMount, saEnv), Containers: []corev1.Container{ corev1.Container{ Name: "main", @@ -837,16 +837,33 @@ func ensureVolumeMountsInitContainer(initContainers []corev1.Container, volumeMo return updatedInitContainers } -func convertJobManagerInitContainers(jobManagerSpec *v1beta1.JobManagerSpec) []corev1.Container { - return ensureVolumeMountsInitContainer(jobManagerSpec.InitContainers, jobManagerSpec.VolumeMounts) +func setGSAEnv(initContainers []corev1.Container, saMount *corev1.VolumeMount, saEnv *corev1.EnvVar) []corev1.Container { + updatedInitContainers := []corev1.Container{} + for _, initContainer := range initContainers { + if saEnv != nil { + initContainer.Env = append(initContainer.Env, *saEnv) + } + if saMount != nil { + initContainer.VolumeMounts = append(initContainer.VolumeMounts, *saMount) + } + updatedInitContainers = append(updatedInitContainers, initContainer) + } + return updatedInitContainers +} + +func convertJobManagerInitContainers(jobManagerSpec *v1beta1.JobManagerSpec, saMount *corev1.VolumeMount, saEnv *corev1.EnvVar) []corev1.Container { + updatedInitContainers := ensureVolumeMountsInitContainer(jobManagerSpec.InitContainers, jobManagerSpec.VolumeMounts) + return setGSAEnv(updatedInitContainers, saMount, saEnv) } -func convertTaskManagerInitContainers(taskSpec *v1beta1.TaskManagerSpec) []corev1.Container { - return ensureVolumeMountsInitContainer(taskSpec.InitContainers, taskSpec.VolumeMounts) +func convertTaskManagerInitContainers(taskSpec *v1beta1.TaskManagerSpec, saMount *corev1.VolumeMount, saEnv *corev1.EnvVar) []corev1.Container { + updatedInitContainers := ensureVolumeMountsInitContainer(taskSpec.InitContainers, taskSpec.VolumeMounts) + return setGSAEnv(updatedInitContainers, saMount, saEnv) } -func convertJobInitContainers(jobSpec *v1beta1.JobSpec) []corev1.Container { - return ensureVolumeMountsInitContainer(jobSpec.InitContainers, jobSpec.VolumeMounts) +func convertJobInitContainers(jobSpec *v1beta1.JobSpec, saMount *corev1.VolumeMount, saEnv *corev1.EnvVar) []corev1.Container { + updatedInitContainers := ensureVolumeMountsInitContainer(jobSpec.InitContainers, jobSpec.VolumeMounts) + return setGSAEnv(updatedInitContainers, saMount, saEnv) } // Converts the FlinkCluster as owner reference for its child resources. diff --git a/controllers/flinkcluster_converter_test.go b/controllers/flinkcluster_converter_test.go index 6497ac08..7ba9a165 100644 --- a/controllers/flinkcluster_converter_test.go +++ b/controllers/flinkcluster_converter_test.go @@ -807,6 +807,17 @@ func TestGetDesiredClusterState(t *testing.T) { }, VolumeMounts: []v1.VolumeMount{ {Name: "cache-volume", MountPath: "/cache"}, + { + Name: "gcp-service-account-volume", + MountPath: "/etc/gcp_service_account/", + ReadOnly: true, + }, + }, + Env: []v1.EnvVar{ + { + Name: "GOOGLE_APPLICATION_CREDENTIALS", + Value: "/etc/gcp_service_account/gcp_service_account_key.json", + }, }, }, },