Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of pod notifier #21

Merged
merged 4 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/virtual-kubelet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func runNode(ctx context.Context) error {
}

func newSaladCloudProvider(ctx context.Context, pc nodeutil.ProviderConfig) (nodeutil.Provider, node.NodeProvider, error) {
p, err := provider.NewSaladCloudProvider(ctx, inputs)
p, err := provider.NewSaladCloudProvider(ctx, inputs, pc)
if err != nil {
logrus.WithError(err).Error("Failed to create SaladCloud provider")
return nil, nil, err
Expand Down
31 changes: 31 additions & 0 deletions internal/models/api_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package models

import (
"fmt"
"net/http"
)

type APIError struct {
StatusCode int
Message string
}

func (a *APIError) Error() string {
message := fmt.Sprintf(
"a %d error was returned from SaladCloud: \"%s\"",
a.StatusCode,
a.Message,
)

return message
}

func NewSaladCloudError(err error, response *http.Response) error {
if response == nil {
return err
}
return &APIError{
StatusCode: response.StatusCode,
Message: err.Error(),
}
}
150 changes: 150 additions & 0 deletions internal/provider/pods_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package provider

import (
"context"
"errors"
"github.com/SaladTechnologies/virtual-kubelet-saladcloud/internal/models"
"github.com/SaladTechnologies/virtual-kubelet-saladcloud/internal/utils"
"github.com/virtual-kubelet/virtual-kubelet/log"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
corev1listers "k8s.io/client-go/listers/core/v1"
"net/http"
"time"
)

// Define the intervals for pod status updates and stale pod cleanup
var podStatusUpdateInterval = 5 * time.Second
var stalePodCleanupInterval = 5 * time.Minute

type PodsTrackerHandler interface {
GetPods(ctx context.Context) ([]*corev1.Pod, error)
GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error)
DeletePod(ctx context.Context, pod *corev1.Pod) error
}

type PodsTracker struct {
ctx context.Context
logger log.Logger
podLister corev1listers.PodLister
updateCallback func(*corev1.Pod)
handler PodsTrackerHandler
}

func (pt *PodsTracker) BeginPodTracking(ctx context.Context) {
statusUpdatesTimer := time.NewTimer(podStatusUpdateInterval)
cleanupTimer := time.NewTimer(stalePodCleanupInterval)
defer statusUpdatesTimer.Stop()
defer cleanupTimer.Stop()

for {
select {
case <-ctx.Done():
log.G(ctx).WithError(ctx.Err()).Debug("Pod status update loop exiting")
return
case <-statusUpdatesTimer.C:
pt.updatePods()
statusUpdatesTimer.Reset(podStatusUpdateInterval)
case <-cleanupTimer.C:
pt.removeStalePods()
cleanupTimer.Reset(stalePodCleanupInterval)
}
}
}

func (pt *PodsTracker) updatePods() {
pt.logger.Debug("Pod notifier update pods called")
k8sPods, err := pt.podLister.List(labels.Everything())
if err != nil {
pt.logger.WithError(err).Errorf("failed to retrieve pods list")
return
}
for _, pod := range k8sPods {
updatedPod := pod.DeepCopy()
ok := pt.handlePodUpdates(updatedPod)
if ok {
pt.updateCallback(updatedPod)
}
}
}

func (pt *PodsTracker) removeStalePods() {
pt.logger.Debug("remove stale Pods from cluster")
clusterPods, err := pt.podLister.List(labels.Everything())
if err != nil {
pt.logger.WithError(err).Errorf("removeStalePodsInCluster: failed to retrieve pods list")
return
}
activePods, err := pt.handler.GetPods(pt.ctx)
if err != nil {
pt.logger.WithError(err).Errorf("removeStalePodsInCluster: failed to retrieve active container groups")
return
}
clusterPodMap := make(map[string]bool)
for _, pod := range clusterPods {
key := utils.GetPodName(pod.Namespace, pod.Name, pod)
clusterPodMap[key] = true
}
for i := range activePods {
if _, exists := clusterPodMap[activePods[i].Spec.Containers[0].Name]; !exists {
pt.logger.Debugf("removeStalePodsInCluster: removing stale pod: %s", activePods[i].Name)
err := pt.handler.DeletePod(pt.ctx, activePods[i])
if err != nil {
pt.logger.WithError(err).Errorf("removeStalePodsInCluster: failed to remove stale pod %v", activePods[i].Name)
}
}
}
}

func (pt *PodsTracker) handlePodUpdates(pod *corev1.Pod) bool {
pt.logger.Debug("Processing Pod Updates")
if pt.isPodStatusUpdateRequired(pod) {
pt.logger.Infof("handlePodStatusUpdate: Skipping pod status update for pod %s", pod.Name)
return false
}
podCurrentStatus, err := pt.handler.GetPodStatus(pt.ctx, pod.Namespace, pod.Name)
if err == nil && podCurrentStatus != nil {
podCurrentStatus.DeepCopyInto(&pod.Status)
return true
}
if err != nil {
var apiError *models.APIError
if errors.As(err, &apiError) && pod.Status.Phase == corev1.PodRunning && apiError.StatusCode == http.StatusNotFound {
return pt.handlePodNotFound(pod)
}
pt.logger.WithError(err).Errorf("handlePodStatusUpdate: Failed to retrieve pod %v status from provider", pod.Name)
return false
}
return true
}

func (pt *PodsTracker) handlePodNotFound(pod *corev1.Pod) bool {
pt.logger.Infof("handlePodNotFound: Pod %s not found on the provider, updating status", pod.Name)
for i := range pod.Status.ContainerStatuses {
pod.Status.Phase = corev1.PodFailed
pod.Status.Reason = "NotFoundOnProvider"
pod.Status.Message = "The container group has been deleted"
now := metav1.NewTime(time.Now())
if pod.Status.ContainerStatuses[i].State.Running == nil {
continue
}
pod.Status.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
ExitCode: 137,
Reason: "NotFoundOnProvider",
Message: "The container group has been deleted",
FinishedAt: now,
StartedAt: pod.Status.ContainerStatuses[i].State.Running.StartedAt,
ContainerID: pod.Status.ContainerStatuses[i].ContainerID,
}
pod.Status.ContainerStatuses[i].State.Running = nil
}
return true
}

func (pt *PodsTracker) isPodStatusUpdateRequired(pod *corev1.Pod) bool {
return pod.Status.Phase == corev1.PodSucceeded || // Pod completed its execution
pod.Status.Phase == corev1.PodFailed ||
pod.Status.Reason == "ProviderFailed" || // in case if provider failed to create/register the pod
pod.DeletionTimestamp != nil // Terminating
}
26 changes: 22 additions & 4 deletions internal/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/log"
nodeapi "github.com/virtual-kubelet/virtual-kubelet/node/api"
"github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1"
"github.com/virtual-kubelet/virtual-kubelet/node/nodeutil"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"io"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"strconv"
"strings"
"time"
Expand All @@ -32,6 +34,8 @@ type SaladCloudProvider struct {
apiClient *saladclient.APIClient
countryCodes []saladclient.CountryCode
logger log.Logger
podsTracker *PodsTracker
podLister corev1listers.PodLister
}

const (
Expand All @@ -42,11 +46,12 @@ const (
defaultOperatingSystem = "Linux"
)

func NewSaladCloudProvider(ctx context.Context, inputVars models.InputVars) (*SaladCloudProvider, error) {
func NewSaladCloudProvider(ctx context.Context, inputVars models.InputVars, providerConfig nodeutil.ProviderConfig) (*SaladCloudProvider, error) {
cloudProvider := &SaladCloudProvider{
inputVars: inputVars,
apiClient: saladclient.NewAPIClient(saladclient.NewConfiguration()),
logger: log.G(ctx),
podLister: providerConfig.Pods,
}
cloudProvider.setNodeCapacity()
cloudProvider.setCountryCodes([]string{"US"})
Expand Down Expand Up @@ -84,6 +89,18 @@ func (p *SaladCloudProvider) getNodeCapacity() corev1.ResourceList {
return resourceList
}

func (p *SaladCloudProvider) NotifyPods(ctx context.Context, notifierCallback func(*corev1.Pod)) {
p.logger.Debug("Notify pods set")
p.podsTracker = &PodsTracker{
podLister: p.podLister,
updateCallback: notifierCallback,
handler: p,
ctx: ctx,
logger: p.logger,
}
go p.podsTracker.BeginPodTracking(ctx)
}

func (p *SaladCloudProvider) CreatePod(ctx context.Context, pod *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "CreatePod")
defer span.End()
Expand Down Expand Up @@ -229,8 +246,10 @@ func (p *SaladCloudProvider) GetPodStatus(ctx context.Context, namespace string,

containerGroup, response, err := p.apiClient.ContainerGroupsAPI.GetContainerGroup(p.contextWithAuth(), p.inputVars.OrganizationName, p.inputVars.ProjectName, utils.GetPodName(namespace, name, nil)).Execute()
if err != nil {
p.logger.Errorf("ContainerGroupsAPI.GetPodStatus ", response)
return nil, err
p.logger.WithField("namespace", namespace).
WithField("name", name).
Errorf("ContainerGroupsAPI.GetPodStatus ", response)
return nil, models.NewSaladCloudError(err, response)
}

startTime := metav1.NewTime(containerGroup.CreateTime)
Expand All @@ -249,7 +268,6 @@ func (p *SaladCloudProvider) GetPodStatus(ctx context.Context, namespace string,
}

func (p *SaladCloudProvider) GetPods(ctx context.Context) ([]*corev1.Pod, error) {

resp, r, err := p.apiClient.ContainerGroupsAPI.ListContainerGroups(p.contextWithAuth(), p.inputVars.OrganizationName, p.inputVars.ProjectName).Execute()
if err != nil {
p.logger.Errorf("Error when list ContainerGroupsAPI.ListContainerGroups ", r)
Expand Down