diff --git a/docs/09-function-runner-pod-templates.md b/docs/09-function-runner-pod-templates.md new file mode 100644 index 00000000..bc944e14 --- /dev/null +++ b/docs/09-function-runner-pod-templates.md @@ -0,0 +1,88 @@ +# Function runner pod templating + +## Why + +`porch-fn-runner` implements a simplistic function-as-a-service for executing kpt functions, running the needed kpt functions wrapped in a grpc server. The function is starting up a number of function evaluator pods for each of the kpt functions. As with any operator that manages pods, it's good to provide some templating and parametrization capabilities of the pods that will be managed by the function runner. + +## Contract for writing pod templates + +The following contract needs to be fulfilled by any function evaluator pod template: + +1. There is a container named "function". +2. The entrypoint of the "function" container will start the wrapper grpc server. +3. The image of the "function" container can be set to the kpt function's image without impacting starting the entrypoint. +4. The arguments of the "function" container can be appended with the entries from the Dockerfile ENTRYPOINT of the kpt function image. + +## Enabling pod templating on function runner + +A Configmap with the pod template should be created in the namespace where the porch-fn-runner pod is running. +The configmap's name should be included as `--function-pod-template` in the command line arguments in the pod spec of the function runner. + +```yaml +... +spec: + serviceAccountName: porch-fn-runner + containers: + - name: function-runner + image: gcr.io/example-google-project-id/porch-function-runner:latest + imagePullPolicy: IfNotPresent + command: + - /server + - --config=/config.yaml + - --functions=/functions + - --pod-namespace=porch-fn-system + - --function-pod-template=kpt-function-eval-pod-template + env: + - name: WRAPPER_SERVER_IMAGE + value: gcr.io/example-google-project-id/porch-wrapper-server:latest + ports: + - containerPort: 9445 + # Add grpc readiness probe to ensure the cache is ready + readinessProbe: + exec: + command: + - /grpc-health-probe + - -addr + - localhost:9445 +... +``` + +## Example pod template + +The below pod template Configmap matches the default behavior: + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: kpt-function-eval-pod-template +data: + template: | + apiVersion: v1 + kind: Pod + annotations: + cluster-autoscaler.kubernetes.io/safe-to-evict: true + spec: + initContainers: + - name: copy-wrapper-server + image: gcr.io/example-google-project-id/porch-wrapper-server:latest + command: + - cp + - -a + - /wrapper-server/. + - /wrapper-server-tools + volumeMounts: + - name: wrapper-server-tools + mountPath: /wrapper-server-tools + containers: + - name: function + image: image-replaced-by-kpt-func-image + command: + - /wrapper-server-tools/wrapper-server + volumeMounts: + - name: wrapper-server-tools + mountPath: /wrapper-server-tools + volumes: + - name: wrapper-server-tools + emptyDir: {} +``` \ No newline at end of file diff --git a/func/internal/podevaluator.go b/func/internal/podevaluator.go index 1b7c4649..92a4d89b 100644 --- a/func/internal/podevaluator.go +++ b/func/internal/podevaluator.go @@ -29,6 +29,7 @@ import ( "github.com/google/go-containerregistry/pkg/name" "github.com/google/go-containerregistry/pkg/v1/remote" "github.com/nephio-project/porch/func/evaluator" + util "github.com/nephio-project/porch/pkg/util" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "gopkg.in/yaml.v2" @@ -37,20 +38,25 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + yamlutil "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/config" ) const ( - defaultWrapperServerPort = "9446" - volumeName = "wrapper-server-tools" - volumeMountPath = "/wrapper-server-tools" - wrapperServerBin = "wrapper-server" - gRPCProbeBin = "grpc-health-probe" - krmFunctionLabel = "fn.kpt.dev/image" - reclaimAfterAnnotation = "fn.kpt.dev/reclaim-after" - fieldManagerName = "krm-function-runner" + defaultWrapperServerPort = "9446" + volumeName = "wrapper-server-tools" + volumeMountPath = "/wrapper-server-tools" + wrapperServerBin = "wrapper-server" + gRPCProbeBin = "grpc-health-probe" + krmFunctionLabel = "fn.kpt.dev/image" + reclaimAfterAnnotation = "fn.kpt.dev/reclaim-after" + templateVersionAnnotation = "fn.kpt.dev/template-version" + inlineTemplateVersionv1 = "inline-v1" + fieldManagerName = "krm-function-runner" + functionContainerName = "function" + defaultManagerNamespace = "porch-system" channelBufferSize = 128 ) @@ -63,7 +69,7 @@ type podEvaluator struct { var _ Evaluator = &podEvaluator{} -func NewPodEvaluator(namespace, wrapperServerImage string, interval, ttl time.Duration, podTTLConfig string) (Evaluator, error) { +func NewPodEvaluator(namespace, wrapperServerImage string, interval, ttl time.Duration, podTTLConfig string, functionPodTemplateName string) (Evaluator, error) { restCfg, err := config.GetConfig() if err != nil { return nil, fmt.Errorf("failed to get rest config: %w", err) @@ -79,6 +85,13 @@ func NewPodEvaluator(namespace, wrapperServerImage string, interval, ttl time.Du return nil, fmt.Errorf("failed to create client: %w", err) } + managerNs, err := util.GetInClusterNamespace() + if err != nil { + klog.Errorf("failed to get the namespace where the function-runner is running: %v", err) + klog.Warningf("unable to get the namespace where the function-runner is running, assuming it's a test setup, defaulting to : %v", defaultManagerNamespace) + managerNs = defaultManagerNamespace + } + reqCh := make(chan *clientConnRequest, channelBufferSize) readyCh := make(chan *imagePodAndGRPCClient, channelBufferSize) @@ -93,10 +106,13 @@ func NewPodEvaluator(namespace, wrapperServerImage string, interval, ttl time.Du waitlists: map[string][]chan<- *clientConnAndError{}, podManager: &podManager{ - kubeClient: cl, - namespace: namespace, - wrapperServerImage: wrapperServerImage, - podReadyCh: readyCh, + kubeClient: cl, + namespace: namespace, + wrapperServerImage: wrapperServerImage, + podReadyCh: readyCh, + functionPodTemplateName: functionPodTemplateName, + podReadyTimeout: 60 * time.Second, + managerNamespace: managerNs, }, }, } @@ -382,6 +398,10 @@ type podManager struct { kubeClient client.Client // namespace holds the namespace where the executors run namespace string + + //Namespace where the function-runner is running + managerNamespace string + // wrapperServerImage is the image name of the wrapper server wrapperServerImage string @@ -392,6 +412,16 @@ type podManager struct { // Only podManager is allowed to touch this cache. // Its underlying type is map[string]*digestAndEntrypoint. imageMetadataCache sync.Map + + // podReadyTimeout is the timeout podManager will wait for the pod to be ready before reporting an error + podReadyTimeout time.Duration + + // The name of the configmap in the same namespace as the function-runner is running. + // It should contain a pod manifest yaml in .data.template + // The pod manifest is expected to set up wrapper-server as the entrypoint + // of the main container, which must be called "function". + // Pod manager will replace the image + functionPodTemplateName string } type digestAndEntrypoint struct { @@ -440,7 +470,7 @@ func (pm *podManager) getFuncEvalPodClient(ctx context.Context, image string, tt func (pm *podManager) imageDigestAndEntrypoint(ctx context.Context, image string) (*digestAndEntrypoint, error) { start := time.Now() defer func() { - klog.Infof("getting image metadata for %v took %v", image, time.Now().Sub(start)) + klog.Infof("getting image metadata for %v took %v", image, time.Since(start)) }() var entrypoint []string ref, err := name.ParseReference(image) @@ -478,6 +508,8 @@ func (pm *podManager) imageDigestAndEntrypoint(ctx context.Context, image string // retrieveOrCreatePod retrieves or creates a pod for an image. func (pm *podManager) retrieveOrCreatePod(ctx context.Context, image string, ttl time.Duration, useGenerateName bool) (client.ObjectKey, error) { var de *digestAndEntrypoint + var replacePod bool + var currentPod *corev1.Pod var err error val, found := pm.imageMetadataCache.Load(image) if !found { @@ -499,6 +531,11 @@ func (pm *podManager) retrieveOrCreatePod(ctx context.Context, image string, ttl // since the pod may be created by one the other instance and the current instance is not aware of it. // TODO: It's possible to set up a Watch in the fn runner namespace, and always try to maintain a up-to-date local cache. podList := &corev1.PodList{} + podTemplate, templateVersion, err := pm.getBasePodTemplate(ctx) + if err != nil { + klog.Errorf("failed to generate a base pod template: %v", err) + return client.ObjectKey{}, fmt.Errorf("failed to generate a base pod template: %w", err) + } err = pm.kubeClient.List(ctx, podList, client.InNamespace(pm.namespace), client.MatchingLabels(map[string]string{krmFunctionLabel: podId})) if err != nil { klog.Warningf("error when listing pods for %q: %v", image, err) @@ -507,118 +544,191 @@ func (pm *podManager) retrieveOrCreatePod(ctx context.Context, image string, ttl // TODO: maybe we should randomly pick one that is no being deleted. for _, pod := range podList.Items { if pod.DeletionTimestamp == nil { - klog.Infof("retrieved function evaluator pod %v/%v for %q", pod.Namespace, pod.Name, image) - return client.ObjectKeyFromObject(&pod), nil + if isPodTemplateSameVersion(&pod, templateVersion) { + klog.Infof("retrieved function evaluator pod %v/%v for %q", pod.Namespace, pod.Name, image) + return client.ObjectKeyFromObject(&pod), nil + } else { + replacePod = true + currentPod = &pod + break + } } } } - cmd := append([]string{ - filepath.Join(volumeMountPath, wrapperServerBin), - "--port", defaultWrapperServerPort, "--", - }, de.entrypoint...) + err = pm.patchNewPodContainer(podTemplate, *de, image) + if err != nil { + return client.ObjectKey{}, fmt.Errorf("unable to apply the pod: %w", err) + } + pm.patchNewPodMetadata(podTemplate, ttl, podId, templateVersion) - // Create a pod - pod := &corev1.Pod{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Pod", - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: pm.namespace, - Annotations: map[string]string{ - reclaimAfterAnnotation: fmt.Sprintf("%v", time.Now().Add(ttl).Unix()), - // Add the following annotation to make it work well with the cluster autoscaler. - // https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/FAQ.md#what-types-of-pods-can-prevent-ca-from-removing-a-node - "cluster-autoscaler.kubernetes.io/safe-to-evict": "true", + // Server-side apply doesn't support name generation. We have to use Create + // if we need to use name generation. + if useGenerateName || replacePod { + podTemplate.GenerateName = podId + "-" + err = pm.kubeClient.Create(ctx, podTemplate, client.FieldOwner(fieldManagerName)) + if err != nil { + return client.ObjectKey{}, fmt.Errorf("unable to apply the pod: %w", err) + } + if replacePod { + err = pm.kubeClient.Delete(ctx, currentPod) + if err != nil { + return client.ObjectKey{}, fmt.Errorf("unable to clean up previous pod: %w", err) + } + } + } else { + podTemplate.Name = podId + err = pm.kubeClient.Patch(ctx, podTemplate, client.Apply, client.FieldOwner(fieldManagerName)) + if err != nil { + return client.ObjectKey{}, fmt.Errorf("unable to apply the pod: %w", err) + } + } + + klog.Infof("created KRM function evaluator pod %v/%v for %q", podTemplate.Namespace, podTemplate.Name, image) + return client.ObjectKeyFromObject(podTemplate), nil +} + +// Either gets the pod template from configmap, or from an inlined pod template. Also provides the version of the template +func (pm *podManager) getBasePodTemplate(ctx context.Context) (*corev1.Pod, string, error) { + if pm.functionPodTemplateName != "" { + podTemplateCm := &corev1.ConfigMap{} + + err := pm.kubeClient.Get(ctx, client.ObjectKey{ + Name: pm.functionPodTemplateName, + Namespace: pm.managerNamespace, + }, podTemplateCm) + if err != nil { + klog.Errorf("Could not get Configmap containing function pod template: %s/%s", pm.managerNamespace, pm.functionPodTemplateName) + return nil, "", err + } + + decoder := yamlutil.NewYAMLOrJSONDecoder(strings.NewReader(podTemplateCm.Data["template"]), 100) + var basePodTemplate corev1.Pod + err = decoder.Decode(&basePodTemplate) + + if err != nil { + klog.Errorf("Could not decode function pod template: %s", pm.functionPodTemplateName) + return nil, "", fmt.Errorf("unable to decode function pod template: %w", err) + } + + return &basePodTemplate, podTemplateCm.ResourceVersion, nil + } else { + + inlineBasePodTemplate := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", }, - // The function runner can use the label to retrieve the pod. Label is function name + part of its digest. - // If a function has more than one tags pointing to the same digest, we can reuse the same pod. - // TODO: controller-runtime provides field indexer, we can potentially use it to index spec.containers[*].image field. - Labels: map[string]string{ - krmFunctionLabel: podId, + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "cluster-autoscaler.kubernetes.io/safe-to-evict": "true", + }, }, - }, - Spec: corev1.PodSpec{ - // We use initContainer to copy the wrapper server binary into the KRM function image. - InitContainers: []corev1.Container{ - { - Name: "copy-wrapper-server", - Image: pm.wrapperServerImage, - Command: []string{ - "cp", - "-a", - "/wrapper-server/.", - volumeMountPath, - }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: volumeName, - MountPath: volumeMountPath, + Spec: corev1.PodSpec{ + InitContainers: []corev1.Container{ + { + Name: "copy-wrapper-server", + Image: pm.wrapperServerImage, + Command: []string{ + "cp", + "-a", + "/wrapper-server/.", + volumeMountPath, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: volumeName, + MountPath: volumeMountPath, + }, }, }, }, - }, - Containers: []corev1.Container{ - { - Name: "function", - Image: image, - Command: cmd, - ReadinessProbe: &corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - // TODO: use the k8s native GRPC prober when it has been rolled out in GKE. - // https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-grpc-liveness-probe - Exec: &corev1.ExecAction{ - Command: []string{ - filepath.Join(volumeMountPath, gRPCProbeBin), - "-addr", net.JoinHostPort("localhost", defaultWrapperServerPort), + Containers: []corev1.Container{ + { + Name: functionContainerName, + Image: "to-be-replaced", + Command: []string{filepath.Join(volumeMountPath, wrapperServerBin)}, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + // TODO: use the k8s native GRPC prober when it has been rolled out in GKE. + // https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-grpc-liveness-probe + Exec: &corev1.ExecAction{ + Command: []string{ + filepath.Join(volumeMountPath, gRPCProbeBin), + "-addr", net.JoinHostPort("localhost", defaultWrapperServerPort), + }, }, }, }, - }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: volumeName, - MountPath: volumeMountPath, + VolumeMounts: []corev1.VolumeMount{ + { + Name: volumeName, + MountPath: volumeMountPath, + }, }, }, }, - }, - Volumes: []corev1.Volume{ - { - Name: volumeName, - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{}, + Volumes: []corev1.Volume{ + { + Name: volumeName, + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, }, }, }, - }, - } - // Server-side apply doesn't support name generation. We have to use Create - // if we need to use name generation. - if useGenerateName { - pod.GenerateName = podId + "-" - err = pm.kubeClient.Create(ctx, pod, client.FieldOwner(fieldManagerName)) - if err != nil { - return client.ObjectKey{}, fmt.Errorf("unable to apply the pod: %w", err) } - } else { - pod.Name = podId - err = pm.kubeClient.Patch(ctx, pod, client.Apply, client.FieldOwner(fieldManagerName)) - if err != nil { - return client.ObjectKey{}, fmt.Errorf("unable to apply the pod: %w", err) + + return inlineBasePodTemplate, inlineTemplateVersionv1, nil + } +} + +// Patches the expected port, and the original entrypoint and image of the kpt function into the function container +func (pm *podManager) patchNewPodContainer(pod *corev1.Pod, de digestAndEntrypoint, image string) error { + var patchedContainer bool + for i := range pod.Spec.Containers { + container := &pod.Spec.Containers[i] + if container.Name == functionContainerName { + container.Args = append(container.Args, + "--port", defaultWrapperServerPort, + "--", + ) + container.Args = append(container.Args, de.entrypoint...) + container.Image = image + patchedContainer = true } } + if !patchedContainer { + return fmt.Errorf("failed to find the %v container in the pod", functionContainerName) + } + return nil +} - klog.Infof("created KRM function evaluator pod %v/%v for %q", pod.Namespace, pod.Name, image) - return client.ObjectKeyFromObject(pod), nil +// Patch labels and annotations so the cache manager can keep track of the pod +func (pm *podManager) patchNewPodMetadata(pod *corev1.Pod, ttl time.Duration, podId string, templateVersion string) { + pod.ObjectMeta.Namespace = pm.namespace + annotations := pod.ObjectMeta.Annotations + if annotations == nil { + annotations = make(map[string]string) + } + annotations[reclaimAfterAnnotation] = fmt.Sprintf("%v", time.Now().Add(ttl).Unix()) + annotations[templateVersionAnnotation] = templateVersion + pod.ObjectMeta.Annotations = annotations + + labels := pod.ObjectMeta.Labels + if labels == nil { + labels = make(map[string]string) + } + labels[krmFunctionLabel] = podId + pod.ObjectMeta.Labels = labels } // podIpIfRunningAndReady waits for the pod to be running and ready and returns the pod IP and a potential error. func (pm *podManager) podIpIfRunningAndReady(ctx context.Context, podKey client.ObjectKey) (ip string, e error) { var pod corev1.Pod // Wait until the pod is Running - if e := wait.PollImmediate(100*time.Millisecond, 60*time.Second, func() (done bool, err error) { + + if e := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, pm.podReadyTimeout, true, func(ctx context.Context) (done bool, err error) { err = pm.kubeClient.Get(ctx, podKey, &pod) if err != nil { return false, err @@ -661,3 +771,11 @@ func podID(image, hash string) (string, error) { name := strings.ReplaceAll(parts[len(parts)-1], "_", "-") return fmt.Sprintf("%v-%v", name, hash[:8]), nil } + +func isPodTemplateSameVersion(pod *corev1.Pod, templateVersion string) bool { + currVersion, found := pod.Annotations[templateVersionAnnotation] + if !found || currVersion != templateVersion { + return false + } + return true +} diff --git a/func/internal/podevaluator_podmanager_test.go b/func/internal/podevaluator_podmanager_test.go new file mode 100644 index 00000000..86ed8b4f --- /dev/null +++ b/func/internal/podevaluator_podmanager_test.go @@ -0,0 +1,723 @@ +package internal + +import ( + "context" + "fmt" + "net" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + pb "github.com/nephio-project/porch/func/evaluator" + "google.golang.org/grpc" + "gopkg.in/yaml.v3" + corev1 "k8s.io/api/core/v1" + errors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" +) + +type fakeFunctionEvalServer struct { + pb.UnimplementedFunctionEvaluatorServer + evalFunc func(ctx context.Context, req *pb.EvaluateFunctionRequest) (*pb.EvaluateFunctionResponse, error) + port string +} + +func (f *fakeFunctionEvalServer) EvaluateFunction(ctx context.Context, req *pb.EvaluateFunctionRequest) (*pb.EvaluateFunctionResponse, error) { + return f.evalFunc(ctx, req) +} + +func (f *fakeFunctionEvalServer) Start(ctx context.Context) error { + lis, err := net.Listen("tcp", ":"+f.port) + + if err != nil { + return err + } + + server := grpc.NewServer() + pb.RegisterFunctionEvaluatorServer(server, f) + + go server.Serve(lis) + + go func() { + <-ctx.Done() + server.GracefulStop() + lis.Close() + }() + return nil +} + +func TestPodManager(t *testing.T) { + + defaultSuccessEvalFunc := func(ctx context.Context, req *pb.EvaluateFunctionRequest) (*pb.EvaluateFunctionResponse, error) { + return &pb.EvaluateFunctionResponse{ResourceList: []byte("thisShouldBeKRM"), Log: []byte("Success")}, nil + } + + defaultImageMetadataCache := map[string]*digestAndEntrypoint{ + "apply-replacements": { + digest: "5245a52778d684fa698f69861fb2e058b308f6a74fed5bf2fe77d97bad5e071c", + entrypoint: []string{"/apply-replacements"}, + }, + } + + basePodTemplate := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "cluster-autoscaler.kubernetes.io/safe-to-evict": "true", + }, + }, + Spec: corev1.PodSpec{ + InitContainers: []corev1.Container{ + { + Name: "copy-wrapper-server", + Image: "wrapper-server-init", + Command: []string{ + "cp", + "-a", + "/wrapper-server/.", + volumeMountPath, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: volumeName, + MountPath: volumeMountPath, + }, + }, + }, + }, + Containers: []corev1.Container{ + { + Name: functionContainerName, + Image: "to-be-replaced", + Command: []string{filepath.Join(volumeMountPath, wrapperServerBin)}, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + // TODO: use the k8s native GRPC prober when it has been rolled out in GKE. + // https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-grpc-liveness-probe + Exec: &corev1.ExecAction{ + Command: []string{ + filepath.Join(volumeMountPath, gRPCProbeBin), + "-addr", net.JoinHostPort("localhost", defaultWrapperServerPort), + }, + }, + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: volumeName, + MountPath: volumeMountPath, + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: volumeName, + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + }, + } + + tests := []struct { + name string + expectFail bool + skip bool + kubeClient client.WithWatch + namespace string + wrapperServerImage string + imageMetadataCache map[string]*digestAndEntrypoint + evalFunc func(ctx context.Context, req *pb.EvaluateFunctionRequest) (*pb.EvaluateFunctionResponse, error) + functionImage string + podPatch *corev1.Pod + useGenerateName bool + functionPodTemplateName string + managerNamespace string + podPatchAfter time.Duration + }{ + { + name: "Connect to existing pod", + skip: false, + expectFail: false, + functionImage: "apply-replacements", + kubeClient: fake.NewClientBuilder().WithObjects([]client.Object{&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "apply-replacements-5245a527", + Namespace: "porch-fn-system", + Labels: map[string]string{ + krmFunctionLabel: "apply-replacements-5245a527", + }, + Annotations: map[string]string{ + templateVersionAnnotation: inlineTemplateVersionv1, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "function", + Image: "apply-replacements", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + PodIP: "localhost", + }, + }}...).Build(), + namespace: "porch-fn-system", + wrapperServerImage: "wrapper-server", + imageMetadataCache: defaultImageMetadataCache, + evalFunc: defaultSuccessEvalFunc, + useGenerateName: true, + }, + { + name: "Pod is in deleting state", + skip: false, + expectFail: false, + functionImage: "apply-replacements", + kubeClient: fake.NewClientBuilder().WithObjects([]client.Object{&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "apply-replacements-5245a527", + Namespace: "porch-fn-system", + Labels: map[string]string{ + krmFunctionLabel: "apply-replacements-5245a527", + }, + DeletionTimestamp: &metav1.Time{Time: time.Now()}, + Finalizers: []string{"test-finalizer"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "function", + Image: "apply-replacements", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + }, + }, + PodIP: "localhost", + }, + }}...).Build(), + namespace: "porch-fn-system", + wrapperServerImage: "wrapper-server", + imageMetadataCache: defaultImageMetadataCache, + evalFunc: defaultSuccessEvalFunc, + useGenerateName: true, + podPatch: &corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + PodIP: "localhost", + }, + }, + }, + { + name: "Create a new pod", + skip: false, + expectFail: false, + functionImage: "apply-replacements", + kubeClient: fake.NewClientBuilder().Build(), + namespace: "porch-fn-system", + wrapperServerImage: "wrapper-server", + imageMetadataCache: defaultImageMetadataCache, + evalFunc: defaultSuccessEvalFunc, + useGenerateName: true, + podPatch: &corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + PodIP: "localhost", + }, + }, + }, + { + name: "Create pod without name generation", + skip: false, + expectFail: false, + functionImage: "apply-replacements", + kubeClient: fake.NewClientBuilder().WithInterceptorFuncs(interceptor.Funcs{ + Patch: fakeClientPatchFixInterceptor, + }).Build(), + namespace: "porch-fn-system", + wrapperServerImage: "wrapper-server", + imageMetadataCache: defaultImageMetadataCache, + evalFunc: defaultSuccessEvalFunc, + useGenerateName: false, + podPatch: &corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + PodIP: "localhost", + }, + }, + }, + { + name: "Pod startup takes too long", + skip: false, + expectFail: true, + functionImage: "apply-replacements", + kubeClient: fake.NewClientBuilder().Build(), + namespace: "porch-fn-system", + wrapperServerImage: "wrapper-server", + imageMetadataCache: defaultImageMetadataCache, + evalFunc: defaultSuccessEvalFunc, + useGenerateName: true, + podPatch: &corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodScheduled, + Status: corev1.ConditionTrue, + }, + }, + PodIP: "localhost", + }, + }, + }, + { + name: "Pod startup takes some time", + skip: false, + expectFail: true, + functionImage: "apply-replacements", + kubeClient: fake.NewClientBuilder().Build(), + namespace: "porch-fn-system", + wrapperServerImage: "wrapper-server", + imageMetadataCache: defaultImageMetadataCache, + evalFunc: defaultSuccessEvalFunc, + useGenerateName: true, + podPatchAfter: 100 * time.Millisecond, + podPatch: &corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodScheduled, + Status: corev1.ConditionTrue, + }, + }, + PodIP: "localhost", + }, + }, + }, + { + name: "Fail pod creation", + skip: false, + expectFail: true, + functionImage: "apply-replacements", + kubeClient: fake.NewClientBuilder().WithInterceptorFuncs(interceptor.Funcs{ + Create: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.CreateOption) error { + if obj.GetObjectKind().GroupVersionKind().Kind == "Pod" { + return errors.NewInternalError(fmt.Errorf("Faked error")) + } + return nil + }, + }).Build(), + namespace: "porch-fn-system", + wrapperServerImage: "wrapper-server", + imageMetadataCache: defaultImageMetadataCache, + evalFunc: defaultSuccessEvalFunc, + useGenerateName: true, + }, + { //This is current behavior, but is it correct? + name: "If listing pods fail, try to create a new one", + skip: false, + expectFail: false, + functionImage: "apply-replacements", + kubeClient: fake.NewClientBuilder().WithInterceptorFuncs(interceptor.Funcs{ + List: func(ctx context.Context, client client.WithWatch, list client.ObjectList, opts ...client.ListOption) error { + _, ok := list.(*corev1.PodList) + if ok { + return errors.NewInternalError(fmt.Errorf("Faked error")) + } + return nil + }, + }).WithObjects([]client.Object{&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "apply-replacements-5245a527", + Namespace: "porch-fn-system", + Labels: map[string]string{ + krmFunctionLabel: "apply-replacements-5245a527", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "function", + Image: "apply-replacements", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + PodIP: "localhost", + }, + }}...).Build(), + namespace: "porch-fn-system", + wrapperServerImage: "wrapper-server", + imageMetadataCache: defaultImageMetadataCache, + evalFunc: defaultSuccessEvalFunc, + useGenerateName: true, + podPatch: &corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + PodIP: "localhost", + }, + }, + }, + { + name: "Has invalid function image name", + skip: false, + expectFail: true, + functionImage: "invalid@ociref.com", + kubeClient: fake.NewClientBuilder().Build(), + namespace: "porch-fn-system", + wrapperServerImage: "wrapper-server", + imageMetadataCache: defaultImageMetadataCache, + evalFunc: defaultSuccessEvalFunc, + useGenerateName: true, + }, + { + name: "Invalid namespace name", + skip: false, + expectFail: true, + functionImage: "apply-replacements", + kubeClient: fake.NewClientBuilder().Build(), + namespace: "not a valid namespace", + wrapperServerImage: "wrapper-server", + imageMetadataCache: defaultImageMetadataCache, + evalFunc: defaultSuccessEvalFunc, + useGenerateName: true, + }, + { + name: "Function template configmap not found", + skip: false, + expectFail: true, + functionImage: "apply-replacements", + kubeClient: fake.NewClientBuilder().Build(), + namespace: "porch-fn-system", + wrapperServerImage: "wrapper-server", + imageMetadataCache: defaultImageMetadataCache, + evalFunc: defaultSuccessEvalFunc, + useGenerateName: true, + functionPodTemplateName: "function-pod-template", + managerNamespace: defaultManagerNamespace, + }, + { + name: "Function template invalid resource type", + skip: false, + expectFail: true, + functionImage: "apply-replacements", + kubeClient: fake.NewClientBuilder().WithObjects([]client.Object{&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "function-pod-template", + Namespace: defaultManagerNamespace, + }, + Data: map[string]string{ + "template": string(marshalToYamlOrPanic(&corev1.Secret{ + TypeMeta: metav1.TypeMeta{ + Kind: "Secret", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-secret", + Namespace: "test-namespace", + }, + Data: map[string][]byte{ + "test-key": []byte("test-value"), + }, + })), + }, + }}...).Build(), + namespace: "porch-fn-system", + wrapperServerImage: "wrapper-server", + imageMetadataCache: defaultImageMetadataCache, + evalFunc: defaultSuccessEvalFunc, + useGenerateName: true, + functionPodTemplateName: "function-pod-template", + managerNamespace: defaultManagerNamespace, + }, + { + name: "Function template under invalid key", + skip: false, + expectFail: true, + functionImage: "apply-replacements", + kubeClient: fake.NewClientBuilder().WithObjects([]client.Object{&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "function-pod-template", + Namespace: defaultManagerNamespace, + }, + Data: map[string]string{ + "not-template": string(marshalToYamlOrPanic(basePodTemplate)), + }, + }}...).Build(), + namespace: "porch-fn-system", + wrapperServerImage: "wrapper-server", + imageMetadataCache: defaultImageMetadataCache, + evalFunc: defaultSuccessEvalFunc, + useGenerateName: true, + functionPodTemplateName: "function-pod-template", + managerNamespace: defaultManagerNamespace, + }, + { + name: "Function template generates pod", + skip: false, + expectFail: false, + functionImage: "apply-replacements", + kubeClient: fake.NewClientBuilder().WithObjects([]client.Object{&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "function-pod-template", + Namespace: defaultManagerNamespace, + }, + Data: map[string]string{ + "template": string(marshalToYamlOrPanic(basePodTemplate)), + }, + }}...).Build(), + namespace: "porch-fn-system", + wrapperServerImage: "wrapper-server", + imageMetadataCache: defaultImageMetadataCache, + evalFunc: defaultSuccessEvalFunc, + useGenerateName: true, + functionPodTemplateName: "function-pod-template", + managerNamespace: defaultManagerNamespace, + podPatch: &corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + PodIP: "localhost", + }, + }, + }, + { + name: "Function template update is applied when pod is requested", + skip: false, + expectFail: false, + functionImage: "apply-replacements", + kubeClient: fake.NewClientBuilder().WithObjects([]client.Object{&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "function-pod-template", + Namespace: defaultManagerNamespace, + }, + Data: map[string]string{ + "template": string(marshalToYamlOrPanic(basePodTemplate)), + }, + }, + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "apply-replacements-5245a527", + Namespace: "porch-fn-system", + Labels: map[string]string{ + krmFunctionLabel: "apply-replacements-5245a527", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "function", + Image: "apply-replacements", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + PodIP: "localhost", + }, + }}...).Build(), + namespace: "porch-fn-system", + wrapperServerImage: "wrapper-server", + imageMetadataCache: defaultImageMetadataCache, + evalFunc: defaultSuccessEvalFunc, + useGenerateName: true, + functionPodTemplateName: "function-pod-template", + managerNamespace: defaultManagerNamespace, + podPatch: &corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + PodIP: "localhost", + }, + }, + }, + } + fakeServer := &fakeFunctionEvalServer{ + port: defaultWrapperServerPort, + } + srvCtx := context.WithoutCancel(context.Background()) + + err := fakeServer.Start(srvCtx) + if err != nil { + t.Errorf("Failed to set up grpc server for testing %v", err) + t.FailNow() + } + defer srvCtx.Done() + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.skip { + t.SkipNow() + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + //Set up the pod manager + podReadyCh := make(chan *imagePodAndGRPCClient) + pm := &podManager{ + kubeClient: tt.kubeClient, + namespace: tt.namespace, + wrapperServerImage: tt.wrapperServerImage, + imageMetadataCache: sync.Map{}, + podReadyCh: podReadyCh, + podReadyTimeout: 1 * time.Second, + functionPodTemplateName: tt.functionPodTemplateName, + managerNamespace: tt.managerNamespace, + } + + for k, v := range tt.imageMetadataCache { + pm.imageMetadataCache.Store(k, v) + } + + fakeServer.evalFunc = tt.evalFunc + + //Execute the function under test + go pm.getFuncEvalPodClient(ctx, tt.functionImage, time.Hour, tt.useGenerateName) + + if tt.podPatch != nil { + go func() { + watchPod, err := tt.kubeClient.Watch(ctx, &corev1.PodList{}, client.InNamespace(tt.namespace)) + if err != nil { + t.Errorf("Failed to watch for pods: %v", err) + } + ev := <-watchPod.ResultChan() + watchPod.Stop() + + if tt.podPatchAfter > 0 { + <-time.After(tt.podPatchAfter) + } + pod := ev.Object.(*corev1.Pod) + //Not ideal, but fakeClient.Patch doesn't seem to do merging correctly + pod.Status = tt.podPatch.Status + err = pm.kubeClient.Status().Update(ctx, pod) + + if err != nil { + t.Errorf("Failed to patch pod: %v", err) + + } + }() + } + + cc := <-podReadyCh + if cc.err != nil && !tt.expectFail { + t.Errorf("Expected to get ready pod, got error: %v", cc.err) + } else if cc.err == nil { + if tt.expectFail { + t.Errorf("Expected to get error, got ready pod") + } + var pod corev1.Pod + if tt.kubeClient.Get(ctx, cc.pod, &pod); err != nil { + t.Errorf("Failed to get pod: %v", err) + } + + if !strings.HasPrefix(pod.Labels[krmFunctionLabel], tt.functionImage) { + t.Errorf("Expected pod to have label starting wiht %s, got %s", tt.functionImage, pod.Labels[krmFunctionLabel]) + } + if pod.Spec.Containers[0].Image != tt.functionImage { + t.Errorf("Expected pod to have image %s, got %s", tt.functionImage, pod.Spec.Containers[0].Image) + } + + } + + }) + } +} + +// Fake client handles pod patches incorrectly in case the pod doesn't exist +func fakeClientPatchFixInterceptor(ctx context.Context, kubeClient client.WithWatch, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { + if obj.GetObjectKind().GroupVersionKind().Kind == "Pod" { + var canary corev1.Pod + err := kubeClient.Get(ctx, client.ObjectKeyFromObject(obj), &canary) + if err != nil { + if errors.IsNotFound(err) { + err = kubeClient.Create(ctx, obj) + if err != nil { + return err + } + return nil + } + return err + } + } + return nil +} + +func marshalToYamlOrPanic(obj interface{}) []byte { + data, err := yaml.Marshal(obj) + if err != nil { + panic(err) + } + return data +} diff --git a/func/server/server.go b/func/server/server.go index 69d9caaa..cb7e9661 100644 --- a/func/server/server.go +++ b/func/server/server.go @@ -38,14 +38,15 @@ const ( ) var ( - port = flag.Int("port", 9445, "The server port") - functions = flag.String("functions", "./functions", "Path to cached functions.") - config = flag.String("config", "./config.yaml", "Path to the config file.") - podCacheConfig = flag.String("pod-cache-config", "/pod-cache-config/pod-cache-config.yaml", "Path to the pod cache config file. The file is map of function name to TTL.") - podNamespace = flag.String("pod-namespace", "porch-fn-system", "Namespace to run KRM functions pods.") - podTTL = flag.Duration("pod-ttl", 30*time.Minute, "TTL for pods before GC.") - scanInterval = flag.Duration("scan-interval", time.Minute, "The interval of GC between scans.") - disableRuntimes = flag.String("disable-runtimes", "", fmt.Sprintf("The runtime(s) to disable. Multiple runtimes should separated by `,`. Available runtimes: `%v`, `%v`.", execRuntime, podRuntime)) + port = flag.Int("port", 9445, "The server port") + functions = flag.String("functions", "./functions", "Path to cached functions.") + config = flag.String("config", "./config.yaml", "Path to the config file.") + podCacheConfig = flag.String("pod-cache-config", "/pod-cache-config/pod-cache-config.yaml", "Path to the pod cache config file. The file is map of function name to TTL.") + podNamespace = flag.String("pod-namespace", "porch-fn-system", "Namespace to run KRM functions pods.") + podTTL = flag.Duration("pod-ttl", 30*time.Minute, "TTL for pods before GC.") + scanInterval = flag.Duration("scan-interval", time.Minute, "The interval of GC between scans.") + disableRuntimes = flag.String("disable-runtimes", "", fmt.Sprintf("The runtime(s) to disable. Multiple runtimes should separated by `,`. Available runtimes: `%v`, `%v`.", execRuntime, podRuntime)) + functionPodTemplateName = flag.String("function-pod-template", "", "Configmap that contains a pod specification") ) func main() { @@ -88,7 +89,7 @@ func run() error { if wrapperServerImage == "" { return fmt.Errorf("environment variable %v must be set to use pod function evaluator runtime", wrapperServerImageEnv) } - podEval, err := internal.NewPodEvaluator(*podNamespace, wrapperServerImage, *scanInterval, *podTTL, *podCacheConfig) + podEval, err := internal.NewPodEvaluator(*podNamespace, wrapperServerImage, *scanInterval, *podTTL, *podCacheConfig, *functionPodTemplateName) if err != nil { return fmt.Errorf("failed to initialize pod evaluator: %w", err) } diff --git a/pkg/util/util.go b/pkg/util/util.go index 5414e56a..c851ad05 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -18,6 +18,7 @@ import ( "crypto/sha1" "encoding/hex" "fmt" + "os" ) // KubernetesName returns the passed id if it less than maxLen, otherwise @@ -40,3 +41,11 @@ func KubernetesName(id string, hashLen, maxLen int) string { stubIdx := maxLen - hashLen - 1 return fmt.Sprintf("%s-%s", id[:stubIdx], hex.EncodeToString(hash[:])[:hashLen]) } + +func GetInClusterNamespace() (string, error) { + ns, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") + if err != nil { + return "", fmt.Errorf("failed to read namespace: %w", err) + } + return string(ns), nil +}