Skip to content

Commit

Permalink
Implementation of pod notifier (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucklypriyansh-2 authored Oct 16, 2023
1 parent 47dd29b commit d1badea
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 5 deletions.
2 changes: 1 addition & 1 deletion cmd/virtual-kubelet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func runNode(ctx context.Context) error {
}

func newSaladCloudProvider(ctx context.Context, pc nodeutil.ProviderConfig) (nodeutil.Provider, node.NodeProvider, error) {
p, err := provider.NewSaladCloudProvider(ctx, inputs)
p, err := provider.NewSaladCloudProvider(ctx, inputs, pc)
if err != nil {
logrus.WithError(err).Error("Failed to create SaladCloud provider")
return nil, nil, err
Expand Down
31 changes: 31 additions & 0 deletions internal/models/api_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package models

import (
"fmt"
"net/http"
)

type APIError struct {
StatusCode int
Message string
}

func (a *APIError) Error() string {
message := fmt.Sprintf(
"a %d error was returned from SaladCloud: \"%s\"",
a.StatusCode,
a.Message,
)

return message
}

func NewSaladCloudError(err error, response *http.Response) error {
if response == nil {
return err
}
return &APIError{
StatusCode: response.StatusCode,
Message: err.Error(),
}
}
150 changes: 150 additions & 0 deletions internal/provider/pods_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package provider

import (
"context"
"errors"
"github.com/SaladTechnologies/virtual-kubelet-saladcloud/internal/models"
"github.com/SaladTechnologies/virtual-kubelet-saladcloud/internal/utils"
"github.com/virtual-kubelet/virtual-kubelet/log"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
corev1listers "k8s.io/client-go/listers/core/v1"
"net/http"
"time"
)

// Define the intervals for pod status updates and stale pod cleanup
var podStatusUpdateInterval = 5 * time.Second
var stalePodCleanupInterval = 5 * time.Minute

type PodsTrackerHandler interface {
GetPods(ctx context.Context) ([]*corev1.Pod, error)
GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error)
DeletePod(ctx context.Context, pod *corev1.Pod) error
}

type PodsTracker struct {
ctx context.Context
logger log.Logger
podLister corev1listers.PodLister
updateCallback func(*corev1.Pod)
handler PodsTrackerHandler
}

func (pt *PodsTracker) BeginPodTracking(ctx context.Context) {
statusUpdatesTimer := time.NewTimer(podStatusUpdateInterval)
cleanupTimer := time.NewTimer(stalePodCleanupInterval)
defer statusUpdatesTimer.Stop()
defer cleanupTimer.Stop()

for {
select {
case <-ctx.Done():
log.G(ctx).WithError(ctx.Err()).Debug("Pod status update loop exiting")
return
case <-statusUpdatesTimer.C:
pt.updatePods()
statusUpdatesTimer.Reset(podStatusUpdateInterval)
case <-cleanupTimer.C:
pt.removeStalePods()
cleanupTimer.Reset(stalePodCleanupInterval)
}
}
}

func (pt *PodsTracker) updatePods() {
pt.logger.Debug("Pod notifier update pods called")
k8sPods, err := pt.podLister.List(labels.Everything())
if err != nil {
pt.logger.WithError(err).Errorf("failed to retrieve pods list")
return
}
for _, pod := range k8sPods {
updatedPod := pod.DeepCopy()
ok := pt.handlePodUpdates(updatedPod)
if ok {
pt.updateCallback(updatedPod)
}
}
}

func (pt *PodsTracker) removeStalePods() {
pt.logger.Debug("remove stale Pods from cluster")
clusterPods, err := pt.podLister.List(labels.Everything())
if err != nil {
pt.logger.WithError(err).Errorf("removeStalePodsInCluster: failed to retrieve pods list")
return
}
activePods, err := pt.handler.GetPods(pt.ctx)
if err != nil {
pt.logger.WithError(err).Errorf("removeStalePodsInCluster: failed to retrieve active container groups")
return
}
clusterPodMap := make(map[string]bool)
for _, pod := range clusterPods {
key := utils.GetPodName(pod.Namespace, pod.Name, pod)
clusterPodMap[key] = true
}
for i := range activePods {
if _, exists := clusterPodMap[activePods[i].Spec.Containers[0].Name]; !exists {
pt.logger.Debugf("removeStalePodsInCluster: removing stale pod: %s", activePods[i].Name)
err := pt.handler.DeletePod(pt.ctx, activePods[i])
if err != nil {
pt.logger.WithError(err).Errorf("removeStalePodsInCluster: failed to remove stale pod %v", activePods[i].Name)
}
}
}
}

func (pt *PodsTracker) handlePodUpdates(pod *corev1.Pod) bool {
pt.logger.Debug("Processing Pod Updates")
if pt.isPodStatusUpdateRequired(pod) {
pt.logger.Infof("handlePodStatusUpdate: Skipping pod status update for pod %s", pod.Name)
return false
}
podCurrentStatus, err := pt.handler.GetPodStatus(pt.ctx, pod.Namespace, pod.Name)
if err == nil && podCurrentStatus != nil {
podCurrentStatus.DeepCopyInto(&pod.Status)
return true
}
if err != nil {
var apiError *models.APIError
if errors.As(err, &apiError) && pod.Status.Phase == corev1.PodRunning && apiError.StatusCode == http.StatusNotFound {
return pt.handlePodNotFound(pod)
}
pt.logger.WithError(err).Errorf("handlePodStatusUpdate: Failed to retrieve pod %v status from provider", pod.Name)
return false
}
return true
}

func (pt *PodsTracker) handlePodNotFound(pod *corev1.Pod) bool {
pt.logger.Infof("handlePodNotFound: Pod %s not found on the provider, updating status", pod.Name)
for i := range pod.Status.ContainerStatuses {
pod.Status.Phase = corev1.PodFailed
pod.Status.Reason = "NotFoundOnProvider"
pod.Status.Message = "The container group has been deleted"
now := metav1.NewTime(time.Now())
if pod.Status.ContainerStatuses[i].State.Running == nil {
continue
}
pod.Status.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
ExitCode: 137,
Reason: "NotFoundOnProvider",
Message: "The container group has been deleted",
FinishedAt: now,
StartedAt: pod.Status.ContainerStatuses[i].State.Running.StartedAt,
ContainerID: pod.Status.ContainerStatuses[i].ContainerID,
}
pod.Status.ContainerStatuses[i].State.Running = nil
}
return true
}

func (pt *PodsTracker) isPodStatusUpdateRequired(pod *corev1.Pod) bool {
return pod.Status.Phase == corev1.PodSucceeded || // Pod completed its execution
pod.Status.Phase == corev1.PodFailed ||
pod.Status.Reason == "ProviderFailed" || // in case if provider failed to create/register the pod
pod.DeletionTimestamp != nil // Terminating
}
26 changes: 22 additions & 4 deletions internal/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/log"
nodeapi "github.com/virtual-kubelet/virtual-kubelet/node/api"
"github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1"
"github.com/virtual-kubelet/virtual-kubelet/node/nodeutil"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"io"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"strconv"
"strings"
"time"
Expand All @@ -32,6 +34,8 @@ type SaladCloudProvider struct {
apiClient *saladclient.APIClient
countryCodes []saladclient.CountryCode
logger log.Logger
podsTracker *PodsTracker
podLister corev1listers.PodLister
}

const (
Expand All @@ -42,11 +46,12 @@ const (
defaultOperatingSystem = "Linux"
)

func NewSaladCloudProvider(ctx context.Context, inputVars models.InputVars) (*SaladCloudProvider, error) {
func NewSaladCloudProvider(ctx context.Context, inputVars models.InputVars, providerConfig nodeutil.ProviderConfig) (*SaladCloudProvider, error) {
cloudProvider := &SaladCloudProvider{
inputVars: inputVars,
apiClient: saladclient.NewAPIClient(saladclient.NewConfiguration()),
logger: log.G(ctx),
podLister: providerConfig.Pods,
}
cloudProvider.setNodeCapacity()
cloudProvider.setCountryCodes([]string{"US"})
Expand Down Expand Up @@ -84,6 +89,18 @@ func (p *SaladCloudProvider) getNodeCapacity() corev1.ResourceList {
return resourceList
}

func (p *SaladCloudProvider) NotifyPods(ctx context.Context, notifierCallback func(*corev1.Pod)) {
p.logger.Debug("Notify pods set")
p.podsTracker = &PodsTracker{
podLister: p.podLister,
updateCallback: notifierCallback,
handler: p,
ctx: ctx,
logger: p.logger,
}
go p.podsTracker.BeginPodTracking(ctx)
}

func (p *SaladCloudProvider) CreatePod(ctx context.Context, pod *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "CreatePod")
defer span.End()
Expand Down Expand Up @@ -229,8 +246,10 @@ func (p *SaladCloudProvider) GetPodStatus(ctx context.Context, namespace string,

containerGroup, response, err := p.apiClient.ContainerGroupsAPI.GetContainerGroup(p.contextWithAuth(), p.inputVars.OrganizationName, p.inputVars.ProjectName, utils.GetPodName(namespace, name, nil)).Execute()
if err != nil {
p.logger.Errorf("ContainerGroupsAPI.GetPodStatus ", response)
return nil, err
p.logger.WithField("namespace", namespace).
WithField("name", name).
Errorf("ContainerGroupsAPI.GetPodStatus ", response)
return nil, models.NewSaladCloudError(err, response)
}

startTime := metav1.NewTime(containerGroup.CreateTime)
Expand All @@ -249,7 +268,6 @@ func (p *SaladCloudProvider) GetPodStatus(ctx context.Context, namespace string,
}

func (p *SaladCloudProvider) GetPods(ctx context.Context) ([]*corev1.Pod, error) {

resp, r, err := p.apiClient.ContainerGroupsAPI.ListContainerGroups(p.contextWithAuth(), p.inputVars.OrganizationName, p.inputVars.ProjectName).Execute()
if err != nil {
p.logger.Errorf("Error when list ContainerGroupsAPI.ListContainerGroups ", r)
Expand Down

0 comments on commit d1badea

Please sign in to comment.