Skip to content

Commit

Permalink
Propate GSA to init containers (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
jto authored May 12, 2021
1 parent 6cc6afe commit fc4f4da
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 9 deletions.
35 changes: 26 additions & 9 deletions controllers/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func getDesiredJobManagerStatefulSet(
containers = append(containers, jobManagerSpec.Sidecars...)

var podSpec = corev1.PodSpec{
InitContainers: convertJobManagerInitContainers(&jobManagerSpec),
InitContainers: convertJobManagerInitContainers(&jobManagerSpec, saMount, saEnv),
Containers: containers,
Volumes: volumes,
NodeSelector: jobManagerSpec.NodeSelector,
Expand Down Expand Up @@ -503,7 +503,7 @@ func getDesiredTaskManagerStatefulSet(
}}
containers = append(containers, taskManagerSpec.Sidecars...)
var podSpec = corev1.PodSpec{
InitContainers: convertTaskManagerInitContainers(&taskManagerSpec),
InitContainers: convertTaskManagerInitContainers(&taskManagerSpec, saMount, saEnv),
Containers: containers,
Volumes: volumes,
NodeSelector: taskManagerSpec.NodeSelector,
Expand Down Expand Up @@ -735,7 +735,7 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job {
envVars = append(envVars, flinkCluster.Spec.EnvVars...)

var podSpec = corev1.PodSpec{
InitContainers: convertJobInitContainers(jobSpec),
InitContainers: convertJobInitContainers(jobSpec, saMount, saEnv),
Containers: []corev1.Container{
corev1.Container{
Name: "main",
Expand Down Expand Up @@ -837,16 +837,33 @@ func ensureVolumeMountsInitContainer(initContainers []corev1.Container, volumeMo
return updatedInitContainers
}

func convertJobManagerInitContainers(jobManagerSpec *v1beta1.JobManagerSpec) []corev1.Container {
return ensureVolumeMountsInitContainer(jobManagerSpec.InitContainers, jobManagerSpec.VolumeMounts)
func setGSAEnv(initContainers []corev1.Container, saMount *corev1.VolumeMount, saEnv *corev1.EnvVar) []corev1.Container {
updatedInitContainers := []corev1.Container{}
for _, initContainer := range initContainers {
if saEnv != nil {
initContainer.Env = append(initContainer.Env, *saEnv)
}
if saMount != nil {
initContainer.VolumeMounts = append(initContainer.VolumeMounts, *saMount)
}
updatedInitContainers = append(updatedInitContainers, initContainer)
}
return updatedInitContainers
}

func convertJobManagerInitContainers(jobManagerSpec *v1beta1.JobManagerSpec, saMount *corev1.VolumeMount, saEnv *corev1.EnvVar) []corev1.Container {
updatedInitContainers := ensureVolumeMountsInitContainer(jobManagerSpec.InitContainers, jobManagerSpec.VolumeMounts)
return setGSAEnv(updatedInitContainers, saMount, saEnv)
}

func convertTaskManagerInitContainers(taskSpec *v1beta1.TaskManagerSpec) []corev1.Container {
return ensureVolumeMountsInitContainer(taskSpec.InitContainers, taskSpec.VolumeMounts)
func convertTaskManagerInitContainers(taskSpec *v1beta1.TaskManagerSpec, saMount *corev1.VolumeMount, saEnv *corev1.EnvVar) []corev1.Container {
updatedInitContainers := ensureVolumeMountsInitContainer(taskSpec.InitContainers, taskSpec.VolumeMounts)
return setGSAEnv(updatedInitContainers, saMount, saEnv)
}

func convertJobInitContainers(jobSpec *v1beta1.JobSpec) []corev1.Container {
return ensureVolumeMountsInitContainer(jobSpec.InitContainers, jobSpec.VolumeMounts)
func convertJobInitContainers(jobSpec *v1beta1.JobSpec, saMount *corev1.VolumeMount, saEnv *corev1.EnvVar) []corev1.Container {
updatedInitContainers := ensureVolumeMountsInitContainer(jobSpec.InitContainers, jobSpec.VolumeMounts)
return setGSAEnv(updatedInitContainers, saMount, saEnv)
}

// Converts the FlinkCluster as owner reference for its child resources.
Expand Down
11 changes: 11 additions & 0 deletions controllers/flinkcluster_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,17 @@ func TestGetDesiredClusterState(t *testing.T) {
},
VolumeMounts: []v1.VolumeMount{
{Name: "cache-volume", MountPath: "/cache"},
{
Name: "gcp-service-account-volume",
MountPath: "/etc/gcp_service_account/",
ReadOnly: true,
},
},
Env: []v1.EnvVar{
{
Name: "GOOGLE_APPLICATION_CREDENTIALS",
Value: "/etc/gcp_service_account/gcp_service_account_key.json",
},
},
},
},
Expand Down

0 comments on commit fc4f4da

Please sign in to comment.