Skip to content

Commit

Permalink
Fixed namespace issue, added more test cases to cover newly introduce…
Browse files Browse the repository at this point in the history
…d parts
  • Loading branch information
nagygergo committed May 28, 2024
1 parent fdd0b59 commit 46240cb
Show file tree
Hide file tree
Showing 3 changed files with 375 additions and 42 deletions.
65 changes: 40 additions & 25 deletions func/internal/podevaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/google/go-containerregistry/pkg/name"
"github.com/google/go-containerregistry/pkg/v1/remote"
"github.com/nephio-project/porch/func/evaluator"
util "github.com/nephio-project/porch/pkg/util"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"gopkg.in/yaml.v2"
Expand All @@ -53,6 +54,7 @@ const (
reclaimAfterAnnotation = "fn.kpt.dev/reclaim-after"
fieldManagerName = "krm-function-runner"
functionContainerName = "function"
defaultManagerNamespace = "porch-system"

channelBufferSize = 128
)
Expand Down Expand Up @@ -81,6 +83,13 @@ func NewPodEvaluator(namespace, wrapperServerImage string, interval, ttl time.Du
return nil, fmt.Errorf("failed to create client: %w", err)
}

managerNs, err := util.GetInClusterNamespace()
if err != nil {
klog.Errorf("failed to get the namespace where the function-runner is running: %v", err)
klog.Warningf("unable to get the namespace where the function-runner is running, assuming it's a test setup, defaulting to : %v", defaultManagerNamespace)
managerNs = defaultManagerNamespace
}

reqCh := make(chan *clientConnRequest, channelBufferSize)
readyCh := make(chan *imagePodAndGRPCClient, channelBufferSize)

Expand All @@ -101,6 +110,7 @@ func NewPodEvaluator(namespace, wrapperServerImage string, interval, ttl time.Du
podReadyCh: readyCh,
functionPodTemplateName: functionPodTemplateName,
podReadyTimeout: 60 * time.Second,
managerNamespace: managerNs,
},
},
}
Expand Down Expand Up @@ -386,6 +396,10 @@ type podManager struct {
kubeClient client.Client
// namespace holds the namespace where the executors run
namespace string

//Namespace where the function-runner is running
managerNamespace string

// wrapperServerImage is the image name of the wrapper server
wrapperServerImage string

Expand All @@ -397,6 +411,7 @@ type podManager struct {
// Its underlying type is map[string]*digestAndEntrypoint.
imageMetadataCache sync.Map

// podReadyTimeout is the timeout podManager will wait for the pod to be ready before reporting an error
podReadyTimeout time.Duration

// The name of the configmap in the same namespace as the function-runner is running.
Expand Down Expand Up @@ -453,7 +468,7 @@ func (pm *podManager) getFuncEvalPodClient(ctx context.Context, image string, tt
func (pm *podManager) imageDigestAndEntrypoint(ctx context.Context, image string) (*digestAndEntrypoint, error) {
start := time.Now()
defer func() {
klog.Infof("getting image metadata for %v took %v", image, time.Now().Sub(start))
klog.Infof("getting image metadata for %v took %v", image, time.Since(start))
}()
var entrypoint []string
ref, err := name.ParseReference(image)
Expand Down Expand Up @@ -528,10 +543,12 @@ func (pm *podManager) retrieveOrCreatePod(ctx context.Context, image string, ttl

pod, err := pm.getBasePodTemplate(ctx)
if err != nil {
return client.ObjectKey{}, fmt.Errorf("Failed to generate a base pod template")
return client.ObjectKey{}, fmt.Errorf("failed to generate a base pod template: %w", err)
}
err = pm.patchNewPodContainer(pod, *de, image)
if err != nil {
return client.ObjectKey{}, fmt.Errorf("unable to apply the pod: %w", err)
}
pm.patchNewPodImage(pod, image)
pm.patchNewPodArgs(pod, *de)
pm.patchNewPodMetadata(pod, ttl, podId)

// Server-side apply doesn't support name generation. We have to use Create
Expand All @@ -554,20 +571,29 @@ func (pm *podManager) retrieveOrCreatePod(ctx context.Context, image string, ttl
return client.ObjectKeyFromObject(pod), nil
}

// Either gets the pod template from configmap, or aligns it to the
// Either gets the pod template from configmap, or from an inlined pod template.
func (pm *podManager) getBasePodTemplate(ctx context.Context) (*corev1.Pod, error) {
if pm.functionPodTemplateName != "" {
podTemplateCm := &corev1.ConfigMap{}

err := pm.kubeClient.Get(ctx, client.ObjectKey{
Name: pm.functionPodTemplateName,
Name: pm.functionPodTemplateName,
Namespace: pm.managerNamespace,
}, podTemplateCm)
if err != nil {
klog.Errorf("Could not get Configmap containing function pod template: %s", pm.functionPodTemplateName)
klog.Errorf("Could not get Configmap containing function pod template: %s/%s", pm.managerNamespace, pm.functionPodTemplateName)
return nil, err
}

decoder := yamlutil.NewYAMLOrJSONDecoder(strings.NewReader(podTemplateCm.Data["template"]), 100)
var basePodTemplate corev1.Pod
decoder.Decode(&basePodTemplate)
err = decoder.Decode(&basePodTemplate)

if err != nil {
klog.Errorf("Could not decode function pod template: %s", pm.functionPodTemplateName)
return nil, fmt.Errorf("unable to decode function pod template: %w", err)
}

return &basePodTemplate, nil
} else {

Expand Down Expand Up @@ -640,8 +666,8 @@ func (pm *podManager) getBasePodTemplate(ctx context.Context) (*corev1.Pod, erro
}
}

// Patches the expected port, and the original entrypoint of the kpt function into the function container
func (pm *podManager) patchNewPodArgs(pod *corev1.Pod, de digestAndEntrypoint) error {
// Patches the expected port, and the original entrypoint and image of the kpt function into the function container
func (pm *podManager) patchNewPodContainer(pod *corev1.Pod, de digestAndEntrypoint, image string) error {
var patchedContainer bool
for _, container := range pod.Spec.Containers {
if container.Name == functionContainerName {
Expand All @@ -650,24 +676,12 @@ func (pm *podManager) patchNewPodArgs(pod *corev1.Pod, de digestAndEntrypoint) e
"--",
)
container.Args = append(container.Args, de.entrypoint...)
patchedContainer = true
}
}
if !patchedContainer {
return fmt.Errorf("haven't found the function container in the pod")
}
return nil
}

func (pm *podManager) patchNewPodImage(pod *corev1.Pod, image string) error {
var patchedContainer bool
for _, container := range pod.Spec.Containers {
if container.Name == functionContainerName {
container.Image = image
patchedContainer = true
}
}
if !patchedContainer {
return fmt.Errorf("haven't found the function container in the pod")
return fmt.Errorf("failed to find the %v container in the pod", functionContainerName)
}
return nil
}
Expand All @@ -694,7 +708,8 @@ func (pm *podManager) patchNewPodMetadata(pod *corev1.Pod, ttl time.Duration, po
func (pm *podManager) podIpIfRunningAndReady(ctx context.Context, podKey client.ObjectKey) (ip string, e error) {
var pod corev1.Pod
// Wait until the pod is Running
if e := wait.PollImmediate(100*time.Millisecond, pm.podReadyTimeout, func() (done bool, err error) {

if e := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, pm.podReadyTimeout, true, func(ctx context.Context) (done bool, err error) {
err = pm.kubeClient.Get(ctx, podKey, &pod)
if err != nil {
return false, err
Expand Down
Loading

0 comments on commit 46240cb

Please sign in to comment.