From 45975409110a9ed3f373df94adaca09fe0f4c0d0 Mon Sep 17 00:00:00 2001 From: lucklypriyansh-2 Date: Wed, 11 Oct 2023 18:55:13 +0530 Subject: [PATCH 1/4] wip --- cmd/virtual-kubelet/main.go | 2 +- internal/provider/pods_tracker.go | 65 +++++++++++++++++++++++++++++++ internal/provider/provider.go | 23 ++++++++++- 3 files changed, 87 insertions(+), 3 deletions(-) create mode 100644 internal/provider/pods_tracker.go diff --git a/cmd/virtual-kubelet/main.go b/cmd/virtual-kubelet/main.go index 6ca4bd1..a52e440 100644 --- a/cmd/virtual-kubelet/main.go +++ b/cmd/virtual-kubelet/main.go @@ -117,7 +117,7 @@ func runNode(ctx context.Context) error { } func newSaladCloudProvider(ctx context.Context, pc nodeutil.ProviderConfig) (nodeutil.Provider, node.NodeProvider, error) { - p, err := provider.NewSaladCloudProvider(ctx, inputs) + p, err := provider.NewSaladCloudProvider(ctx, inputs, pc) if err != nil { logrus.WithError(err).Error("Failed to create SaladCloud provider") return nil, nil, err diff --git a/internal/provider/pods_tracker.go b/internal/provider/pods_tracker.go new file mode 100644 index 0000000..dcac8d5 --- /dev/null +++ b/internal/provider/pods_tracker.go @@ -0,0 +1,65 @@ +package provider + +import ( + "context" + "github.com/virtual-kubelet/virtual-kubelet/log" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + corev1listers "k8s.io/client-go/listers/core/v1" + "time" +) + +// Define the intervals for pod status updates and stale pod cleanup +var podStatusUpdateInterval = 5 * time.Second +var stalePodCleanupInterval = 5 * time.Minute + +type PodsTrackerHandler interface { + GetPods(ctx context.Context) ([]*corev1.Pod, error) + GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error) + DeletePod(ctx context.Context, pod *corev1.Pod) error +} + +type PodsTracker struct { + ctx context.Context + logger log.Logger + podLister corev1listers.PodLister + updateCallback func(*corev1.Pod) + handler PodsTrackerHandler +} + +func (pt *PodsTracker) BeginPodTracking(ctx context.Context) { + statusUpdatesTimer := time.NewTimer(podStatusUpdateInterval) + cleanupTimer := time.NewTimer(stalePodCleanupInterval) + defer statusUpdatesTimer.Stop() + defer cleanupTimer.Stop() + + for { + select { + case <-ctx.Done(): + log.G(ctx).WithError(ctx.Err()).Debug("Pod status update loop exiting") + return + case <-statusUpdatesTimer.C: + pt.updatePods() + statusUpdatesTimer.Reset(podStatusUpdateInterval) + case <-cleanupTimer.C: + pt.removeStalePods() + cleanupTimer.Reset(stalePodCleanupInterval) + } + } +} + +func (pt *PodsTracker) updatePods() { + pt.logger.Debug("Pod notifier update pods called") + k8sPods, err := pt.podLister.List(labels.Everything()) + if err != nil { + pt.logger.WithError(err).Errorf("failed to retrieve pods list") + return + } + for _, pod := range k8sPods { + pt.logger.Debug("update pod", pod.Name) + } +} + +func (pt *PodsTracker) removeStalePods() { + pt.logger.Debug("Pod notifier remove stale pods called") +} diff --git a/internal/provider/provider.go b/internal/provider/provider.go index 80eb149..a66c3d5 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -12,11 +12,13 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/log" nodeapi "github.com/virtual-kubelet/virtual-kubelet/node/api" "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" + "github.com/virtual-kubelet/virtual-kubelet/node/nodeutil" "github.com/virtual-kubelet/virtual-kubelet/trace" "io" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1listers "k8s.io/client-go/listers/core/v1" "strconv" "strings" "time" @@ -32,6 +34,8 @@ type SaladCloudProvider struct { apiClient *saladclient.APIClient countryCodes []saladclient.CountryCode logger log.Logger + podsTracker *PodsTracker + podLister corev1listers.PodLister } const ( @@ -42,11 +46,12 @@ const ( defaultOperatingSystem = "Linux" ) -func NewSaladCloudProvider(ctx context.Context, inputVars models.InputVars) (*SaladCloudProvider, error) { +func NewSaladCloudProvider(ctx context.Context, inputVars models.InputVars, providerConfig nodeutil.ProviderConfig) (*SaladCloudProvider, error) { cloudProvider := &SaladCloudProvider{ inputVars: inputVars, apiClient: saladclient.NewAPIClient(saladclient.NewConfiguration()), logger: log.G(ctx), + podLister: providerConfig.Pods, } cloudProvider.setNodeCapacity() cloudProvider.setCountryCodes([]string{"US"}) @@ -84,6 +89,18 @@ func (p *SaladCloudProvider) getNodeCapacity() corev1.ResourceList { return resourceList } +func (p *SaladCloudProvider) NotifyPods(ctx context.Context, notifierCallback func(*corev1.Pod)) { + p.logger.Debug("Notify pods set") + p.podsTracker = &PodsTracker{ + podLister: p.podLister, + updateCallback: notifierCallback, + handler: p, + ctx: ctx, + logger: p.logger, + } + go p.podsTracker.BeginPodTracking(ctx) +} + func (p *SaladCloudProvider) CreatePod(ctx context.Context, pod *corev1.Pod) error { ctx, span := trace.StartSpan(ctx, "CreatePod") defer span.End() @@ -229,7 +246,9 @@ func (p *SaladCloudProvider) GetPodStatus(ctx context.Context, namespace string, containerGroup, response, err := p.apiClient.ContainerGroupsAPI.GetContainerGroup(p.contextWithAuth(), p.inputVars.OrganizationName, p.inputVars.ProjectName, utils.GetPodName(namespace, name, nil)).Execute() if err != nil { - p.logger.Errorf("ContainerGroupsAPI.GetPodStatus ", response) + p.logger.WithField("namespace", namespace). + WithField("name", name). + Errorf("ContainerGroupsAPI.GetPodStatus ", response) return nil, err } From 574c9df7303316cfc2d49a3452aadfc2da0fa89d Mon Sep 17 00:00:00 2001 From: lucklypriyansh-2 Date: Thu, 12 Oct 2023 18:12:26 +0530 Subject: [PATCH 2/4] update pod notifier impl --- internal/provider/pods_tracker.go | 52 ++++++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/internal/provider/pods_tracker.go b/internal/provider/pods_tracker.go index dcac8d5..aadd3f3 100644 --- a/internal/provider/pods_tracker.go +++ b/internal/provider/pods_tracker.go @@ -2,8 +2,11 @@ package provider import ( "context" + "errors" + openapi "github.com/lucklypriyansh-2/salad-client" "github.com/virtual-kubelet/virtual-kubelet/log" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" corev1listers "k8s.io/client-go/listers/core/v1" "time" @@ -56,10 +59,57 @@ func (pt *PodsTracker) updatePods() { return } for _, pod := range k8sPods { - pt.logger.Debug("update pod", pod.Name) + updatedPod := pod.DeepCopy() + ok := pt.handlePodUpdates(updatedPod) + if ok { + pt.updateCallback(updatedPod) + } } } func (pt *PodsTracker) removeStalePods() { pt.logger.Debug("Pod notifier remove stale pods called") } + +func (pt *PodsTracker) handlePodUpdates(pod *corev1.Pod) bool { + if pt.isPodStatusUpdateRequired(pod) { + pt.logger.Infof("pod %s will skip pod status update", pod.Name) + return false + } + newStatus, err := pt.handler.GetPodStatus(pt.ctx, pod.Namespace, pod.Name) + if err == nil && newStatus != nil { + newStatus.DeepCopyInto(&pod.Status) + return true + } + var openApiErr openapi.GenericOpenAPIError + if pod.Status.Phase == corev1.PodRunning && errors.As(err, &openApiErr) { + pod.Status.Phase = corev1.PodFailed + pod.Status.Reason = "NotFoundOnProvider" + pod.Status.Message = "the workload has been deleted from salad cloud" + now := metav1.NewTime(time.Now()) + for i := range pod.Status.ContainerStatuses { + if pod.Status.ContainerStatuses[i].State.Running == nil { + continue + } + + pod.Status.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{ + ExitCode: 137, + Reason: "NotFoundOnProvider", + Message: "the workload has been deleted from salad cloud", + FinishedAt: now, + StartedAt: pod.Status.ContainerStatuses[i].State.Running.StartedAt, + ContainerID: pod.Status.ContainerStatuses[i].ContainerID, + } + pod.Status.ContainerStatuses[i].State.Running = nil + } + return true + } + return false +} + +func (pt *PodsTracker) isPodStatusUpdateRequired(pod *corev1.Pod) bool { + return pod.Status.Phase == corev1.PodSucceeded || // Pod completed its execution + pod.Status.Phase == corev1.PodFailed || + pod.Status.Reason == "ProviderFailed" || // in case if provider failed to create/register the pod + pod.DeletionTimestamp != nil // Terminating +} From 00a44741cc1e5cf8ad41076a7369be0bd45c7538 Mon Sep 17 00:00:00 2001 From: lucklypriyansh-2 Date: Fri, 13 Oct 2023 16:48:25 +0530 Subject: [PATCH 3/4] trackers --- internal/models/api_error.go | 31 +++++++++++ internal/provider/pods_tracker.go | 85 ++++++++++++++++++++++--------- internal/provider/provider.go | 2 +- 3 files changed, 92 insertions(+), 26 deletions(-) create mode 100644 internal/models/api_error.go diff --git a/internal/models/api_error.go b/internal/models/api_error.go new file mode 100644 index 0000000..cdf054c --- /dev/null +++ b/internal/models/api_error.go @@ -0,0 +1,31 @@ +package models + +import ( + "fmt" + "net/http" +) + +type APIError struct { + StatusCode int + Message string +} + +func (a *APIError) Error() string { + message := fmt.Sprintf( + "a %d error was returned from SaladCloud: \"%s\"", + a.StatusCode, + a.Message, + ) + + return message +} + +func NewSaladCloudError(err error, response *http.Response) error { + if response == nil { + return err + } + return &APIError{ + StatusCode: response.StatusCode, + Message: err.Error(), + } +} diff --git a/internal/provider/pods_tracker.go b/internal/provider/pods_tracker.go index aadd3f3..186ac03 100644 --- a/internal/provider/pods_tracker.go +++ b/internal/provider/pods_tracker.go @@ -3,12 +3,14 @@ package provider import ( "context" "errors" - openapi "github.com/lucklypriyansh-2/salad-client" + "github.com/SaladTechnologies/virtual-kubelet-saladcloud/internal/models" + "github.com/SaladTechnologies/virtual-kubelet-saladcloud/internal/utils" "github.com/virtual-kubelet/virtual-kubelet/log" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" corev1listers "k8s.io/client-go/listers/core/v1" + "net/http" "time" ) @@ -68,43 +70,76 @@ func (pt *PodsTracker) updatePods() { } func (pt *PodsTracker) removeStalePods() { - pt.logger.Debug("Pod notifier remove stale pods called") + pt.logger.Debug("remove stale Pods from cluster") + clusterPods, err := pt.podLister.List(labels.Everything()) + if err != nil { + pt.logger.WithError(err).Errorf("removeStalePodsInCluster: failed to retrieve pods list") + return + } + activePods, err := pt.handler.GetPods(pt.ctx) + if err != nil { + pt.logger.WithError(err).Errorf("removeStalePodsInCluster: failed to retrieve active container groups") + return + } + clusterPodMap := make(map[string]bool) + for _, pod := range clusterPods { + key := utils.GetPodName(pod.Namespace, pod.Name, pod) + clusterPodMap[key] = true + } + for i := range activePods { + if _, exists := clusterPodMap[activePods[i].Name]; !exists { + err := pt.handler.DeletePod(pt.ctx, activePods[i]) + if err != nil { + pt.logger.WithError(err).Errorf("removeStalePodsInCluster: failed to remove stale pod %v", activePods[i].Name) + } + + } + } } func (pt *PodsTracker) handlePodUpdates(pod *corev1.Pod) bool { + pt.logger.Debug("Processing Pod Updates") if pt.isPodStatusUpdateRequired(pod) { - pt.logger.Infof("pod %s will skip pod status update", pod.Name) + pt.logger.Infof("handlePodStatusUpdate: Skipping pod status update for pod %s", pod.Name) return false } - newStatus, err := pt.handler.GetPodStatus(pt.ctx, pod.Namespace, pod.Name) - if err == nil && newStatus != nil { - newStatus.DeepCopyInto(&pod.Status) + podCurrentStatus, err := pt.handler.GetPodStatus(pt.ctx, pod.Namespace, pod.Name) + if err == nil && podCurrentStatus != nil { + podCurrentStatus.DeepCopyInto(&pod.Status) return true } - var openApiErr openapi.GenericOpenAPIError - if pod.Status.Phase == corev1.PodRunning && errors.As(err, &openApiErr) { + if err != nil { + var apiError *models.APIError + if errors.As(err, &apiError) && pod.Status.Phase == corev1.PodRunning && apiError.StatusCode == http.StatusNotFound { + return pt.handlePodNotFound(pod) + } + pt.logger.WithError(err).Errorf("handlePodStatusUpdate: Failed to retrieve pod %v status from provider", pod.Name) + return false + } + return true +} + +func (pt *PodsTracker) handlePodNotFound(pod *corev1.Pod) bool { + pt.logger.Infof("handlePodNotFound: Pod %s not found on the provider, updating status", pod.Name) + for i := range pod.Status.ContainerStatuses { pod.Status.Phase = corev1.PodFailed pod.Status.Reason = "NotFoundOnProvider" - pod.Status.Message = "the workload has been deleted from salad cloud" + pod.Status.Message = "The container group has been deleted" now := metav1.NewTime(time.Now()) - for i := range pod.Status.ContainerStatuses { - if pod.Status.ContainerStatuses[i].State.Running == nil { - continue - } - - pod.Status.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{ - ExitCode: 137, - Reason: "NotFoundOnProvider", - Message: "the workload has been deleted from salad cloud", - FinishedAt: now, - StartedAt: pod.Status.ContainerStatuses[i].State.Running.StartedAt, - ContainerID: pod.Status.ContainerStatuses[i].ContainerID, - } - pod.Status.ContainerStatuses[i].State.Running = nil + if pod.Status.ContainerStatuses[i].State.Running == nil { + continue } - return true + pod.Status.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{ + ExitCode: 137, + Reason: "NotFoundOnProvider", + Message: "The container group has been deleted", + FinishedAt: now, + StartedAt: pod.Status.ContainerStatuses[i].State.Running.StartedAt, + ContainerID: pod.Status.ContainerStatuses[i].ContainerID, + } + pod.Status.ContainerStatuses[i].State.Running = nil } - return false + return true } func (pt *PodsTracker) isPodStatusUpdateRequired(pod *corev1.Pod) bool { diff --git a/internal/provider/provider.go b/internal/provider/provider.go index a66c3d5..d29ace4 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -249,7 +249,7 @@ func (p *SaladCloudProvider) GetPodStatus(ctx context.Context, namespace string, p.logger.WithField("namespace", namespace). WithField("name", name). Errorf("ContainerGroupsAPI.GetPodStatus ", response) - return nil, err + return nil, models.NewSaladCloudError(err, response) } startTime := metav1.NewTime(containerGroup.CreateTime) From 9779517740117d07bd8e2b8fbdb477bd38d0f3be Mon Sep 17 00:00:00 2001 From: lucklypriyansh-2 Date: Fri, 13 Oct 2023 17:02:44 +0530 Subject: [PATCH 4/4] fix container names --- internal/provider/pods_tracker.go | 4 ++-- internal/provider/provider.go | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/provider/pods_tracker.go b/internal/provider/pods_tracker.go index 186ac03..da9eb93 100644 --- a/internal/provider/pods_tracker.go +++ b/internal/provider/pods_tracker.go @@ -87,12 +87,12 @@ func (pt *PodsTracker) removeStalePods() { clusterPodMap[key] = true } for i := range activePods { - if _, exists := clusterPodMap[activePods[i].Name]; !exists { + if _, exists := clusterPodMap[activePods[i].Spec.Containers[0].Name]; !exists { + pt.logger.Debugf("removeStalePodsInCluster: removing stale pod: %s", activePods[i].Name) err := pt.handler.DeletePod(pt.ctx, activePods[i]) if err != nil { pt.logger.WithError(err).Errorf("removeStalePodsInCluster: failed to remove stale pod %v", activePods[i].Name) } - } } } diff --git a/internal/provider/provider.go b/internal/provider/provider.go index d29ace4..d7ce7ca 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -268,7 +268,6 @@ func (p *SaladCloudProvider) GetPodStatus(ctx context.Context, namespace string, } func (p *SaladCloudProvider) GetPods(ctx context.Context) ([]*corev1.Pod, error) { - resp, r, err := p.apiClient.ContainerGroupsAPI.ListContainerGroups(p.contextWithAuth(), p.inputVars.OrganizationName, p.inputVars.ProjectName).Execute() if err != nil { p.logger.Errorf("Error when list ContainerGroupsAPI.ListContainerGroups ", r)