From d1badea68d02c4e239522bef634bb7ce4e73f80a Mon Sep 17 00:00:00 2001 From: lucklypriyansh-2 Date: Mon, 16 Oct 2023 10:15:44 +0000 Subject: [PATCH] Implementation of pod notifier (#21) --- cmd/virtual-kubelet/main.go | 2 +- internal/models/api_error.go | 31 ++++++ internal/provider/pods_tracker.go | 150 ++++++++++++++++++++++++++++++ internal/provider/provider.go | 26 +++++- 4 files changed, 204 insertions(+), 5 deletions(-) create mode 100644 internal/models/api_error.go create mode 100644 internal/provider/pods_tracker.go diff --git a/cmd/virtual-kubelet/main.go b/cmd/virtual-kubelet/main.go index 6ca4bd1..a52e440 100644 --- a/cmd/virtual-kubelet/main.go +++ b/cmd/virtual-kubelet/main.go @@ -117,7 +117,7 @@ func runNode(ctx context.Context) error { } func newSaladCloudProvider(ctx context.Context, pc nodeutil.ProviderConfig) (nodeutil.Provider, node.NodeProvider, error) { - p, err := provider.NewSaladCloudProvider(ctx, inputs) + p, err := provider.NewSaladCloudProvider(ctx, inputs, pc) if err != nil { logrus.WithError(err).Error("Failed to create SaladCloud provider") return nil, nil, err diff --git a/internal/models/api_error.go b/internal/models/api_error.go new file mode 100644 index 0000000..cdf054c --- /dev/null +++ b/internal/models/api_error.go @@ -0,0 +1,31 @@ +package models + +import ( + "fmt" + "net/http" +) + +type APIError struct { + StatusCode int + Message string +} + +func (a *APIError) Error() string { + message := fmt.Sprintf( + "a %d error was returned from SaladCloud: \"%s\"", + a.StatusCode, + a.Message, + ) + + return message +} + +func NewSaladCloudError(err error, response *http.Response) error { + if response == nil { + return err + } + return &APIError{ + StatusCode: response.StatusCode, + Message: err.Error(), + } +} diff --git a/internal/provider/pods_tracker.go b/internal/provider/pods_tracker.go new file mode 100644 index 0000000..da9eb93 --- /dev/null +++ b/internal/provider/pods_tracker.go @@ -0,0 +1,150 @@ +package provider + +import ( + "context" + "errors" + "github.com/SaladTechnologies/virtual-kubelet-saladcloud/internal/models" + "github.com/SaladTechnologies/virtual-kubelet-saladcloud/internal/utils" + "github.com/virtual-kubelet/virtual-kubelet/log" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + corev1listers "k8s.io/client-go/listers/core/v1" + "net/http" + "time" +) + +// Define the intervals for pod status updates and stale pod cleanup +var podStatusUpdateInterval = 5 * time.Second +var stalePodCleanupInterval = 5 * time.Minute + +type PodsTrackerHandler interface { + GetPods(ctx context.Context) ([]*corev1.Pod, error) + GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error) + DeletePod(ctx context.Context, pod *corev1.Pod) error +} + +type PodsTracker struct { + ctx context.Context + logger log.Logger + podLister corev1listers.PodLister + updateCallback func(*corev1.Pod) + handler PodsTrackerHandler +} + +func (pt *PodsTracker) BeginPodTracking(ctx context.Context) { + statusUpdatesTimer := time.NewTimer(podStatusUpdateInterval) + cleanupTimer := time.NewTimer(stalePodCleanupInterval) + defer statusUpdatesTimer.Stop() + defer cleanupTimer.Stop() + + for { + select { + case <-ctx.Done(): + log.G(ctx).WithError(ctx.Err()).Debug("Pod status update loop exiting") + return + case <-statusUpdatesTimer.C: + pt.updatePods() + statusUpdatesTimer.Reset(podStatusUpdateInterval) + case <-cleanupTimer.C: + pt.removeStalePods() + cleanupTimer.Reset(stalePodCleanupInterval) + } + } +} + +func (pt *PodsTracker) updatePods() { + pt.logger.Debug("Pod notifier update pods called") + k8sPods, err := pt.podLister.List(labels.Everything()) + if err != nil { + pt.logger.WithError(err).Errorf("failed to retrieve pods list") + return + } + for _, pod := range k8sPods { + updatedPod := pod.DeepCopy() + ok := pt.handlePodUpdates(updatedPod) + if ok { + pt.updateCallback(updatedPod) + } + } +} + +func (pt *PodsTracker) removeStalePods() { + pt.logger.Debug("remove stale Pods from cluster") + clusterPods, err := pt.podLister.List(labels.Everything()) + if err != nil { + pt.logger.WithError(err).Errorf("removeStalePodsInCluster: failed to retrieve pods list") + return + } + activePods, err := pt.handler.GetPods(pt.ctx) + if err != nil { + pt.logger.WithError(err).Errorf("removeStalePodsInCluster: failed to retrieve active container groups") + return + } + clusterPodMap := make(map[string]bool) + for _, pod := range clusterPods { + key := utils.GetPodName(pod.Namespace, pod.Name, pod) + clusterPodMap[key] = true + } + for i := range activePods { + if _, exists := clusterPodMap[activePods[i].Spec.Containers[0].Name]; !exists { + pt.logger.Debugf("removeStalePodsInCluster: removing stale pod: %s", activePods[i].Name) + err := pt.handler.DeletePod(pt.ctx, activePods[i]) + if err != nil { + pt.logger.WithError(err).Errorf("removeStalePodsInCluster: failed to remove stale pod %v", activePods[i].Name) + } + } + } +} + +func (pt *PodsTracker) handlePodUpdates(pod *corev1.Pod) bool { + pt.logger.Debug("Processing Pod Updates") + if pt.isPodStatusUpdateRequired(pod) { + pt.logger.Infof("handlePodStatusUpdate: Skipping pod status update for pod %s", pod.Name) + return false + } + podCurrentStatus, err := pt.handler.GetPodStatus(pt.ctx, pod.Namespace, pod.Name) + if err == nil && podCurrentStatus != nil { + podCurrentStatus.DeepCopyInto(&pod.Status) + return true + } + if err != nil { + var apiError *models.APIError + if errors.As(err, &apiError) && pod.Status.Phase == corev1.PodRunning && apiError.StatusCode == http.StatusNotFound { + return pt.handlePodNotFound(pod) + } + pt.logger.WithError(err).Errorf("handlePodStatusUpdate: Failed to retrieve pod %v status from provider", pod.Name) + return false + } + return true +} + +func (pt *PodsTracker) handlePodNotFound(pod *corev1.Pod) bool { + pt.logger.Infof("handlePodNotFound: Pod %s not found on the provider, updating status", pod.Name) + for i := range pod.Status.ContainerStatuses { + pod.Status.Phase = corev1.PodFailed + pod.Status.Reason = "NotFoundOnProvider" + pod.Status.Message = "The container group has been deleted" + now := metav1.NewTime(time.Now()) + if pod.Status.ContainerStatuses[i].State.Running == nil { + continue + } + pod.Status.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{ + ExitCode: 137, + Reason: "NotFoundOnProvider", + Message: "The container group has been deleted", + FinishedAt: now, + StartedAt: pod.Status.ContainerStatuses[i].State.Running.StartedAt, + ContainerID: pod.Status.ContainerStatuses[i].ContainerID, + } + pod.Status.ContainerStatuses[i].State.Running = nil + } + return true +} + +func (pt *PodsTracker) isPodStatusUpdateRequired(pod *corev1.Pod) bool { + return pod.Status.Phase == corev1.PodSucceeded || // Pod completed its execution + pod.Status.Phase == corev1.PodFailed || + pod.Status.Reason == "ProviderFailed" || // in case if provider failed to create/register the pod + pod.DeletionTimestamp != nil // Terminating +} diff --git a/internal/provider/provider.go b/internal/provider/provider.go index 80eb149..d7ce7ca 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -12,11 +12,13 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/log" nodeapi "github.com/virtual-kubelet/virtual-kubelet/node/api" "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" + "github.com/virtual-kubelet/virtual-kubelet/node/nodeutil" "github.com/virtual-kubelet/virtual-kubelet/trace" "io" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1listers "k8s.io/client-go/listers/core/v1" "strconv" "strings" "time" @@ -32,6 +34,8 @@ type SaladCloudProvider struct { apiClient *saladclient.APIClient countryCodes []saladclient.CountryCode logger log.Logger + podsTracker *PodsTracker + podLister corev1listers.PodLister } const ( @@ -42,11 +46,12 @@ const ( defaultOperatingSystem = "Linux" ) -func NewSaladCloudProvider(ctx context.Context, inputVars models.InputVars) (*SaladCloudProvider, error) { +func NewSaladCloudProvider(ctx context.Context, inputVars models.InputVars, providerConfig nodeutil.ProviderConfig) (*SaladCloudProvider, error) { cloudProvider := &SaladCloudProvider{ inputVars: inputVars, apiClient: saladclient.NewAPIClient(saladclient.NewConfiguration()), logger: log.G(ctx), + podLister: providerConfig.Pods, } cloudProvider.setNodeCapacity() cloudProvider.setCountryCodes([]string{"US"}) @@ -84,6 +89,18 @@ func (p *SaladCloudProvider) getNodeCapacity() corev1.ResourceList { return resourceList } +func (p *SaladCloudProvider) NotifyPods(ctx context.Context, notifierCallback func(*corev1.Pod)) { + p.logger.Debug("Notify pods set") + p.podsTracker = &PodsTracker{ + podLister: p.podLister, + updateCallback: notifierCallback, + handler: p, + ctx: ctx, + logger: p.logger, + } + go p.podsTracker.BeginPodTracking(ctx) +} + func (p *SaladCloudProvider) CreatePod(ctx context.Context, pod *corev1.Pod) error { ctx, span := trace.StartSpan(ctx, "CreatePod") defer span.End() @@ -229,8 +246,10 @@ func (p *SaladCloudProvider) GetPodStatus(ctx context.Context, namespace string, containerGroup, response, err := p.apiClient.ContainerGroupsAPI.GetContainerGroup(p.contextWithAuth(), p.inputVars.OrganizationName, p.inputVars.ProjectName, utils.GetPodName(namespace, name, nil)).Execute() if err != nil { - p.logger.Errorf("ContainerGroupsAPI.GetPodStatus ", response) - return nil, err + p.logger.WithField("namespace", namespace). + WithField("name", name). + Errorf("ContainerGroupsAPI.GetPodStatus ", response) + return nil, models.NewSaladCloudError(err, response) } startTime := metav1.NewTime(containerGroup.CreateTime) @@ -249,7 +268,6 @@ func (p *SaladCloudProvider) GetPodStatus(ctx context.Context, namespace string, } func (p *SaladCloudProvider) GetPods(ctx context.Context) ([]*corev1.Pod, error) { - resp, r, err := p.apiClient.ContainerGroupsAPI.ListContainerGroups(p.contextWithAuth(), p.inputVars.OrganizationName, p.inputVars.ProjectName).Execute() if err != nil { p.logger.Errorf("Error when list ContainerGroupsAPI.ListContainerGroups ", r)