Skip to content

Commit

Permalink
chore(event): rather than timingout try to retry (#18)
Browse files Browse the repository at this point in the history
creating retry
  • Loading branch information
Jacobbrewer1 authored Nov 13, 2024
1 parent cdb080f commit 7c94bce
Showing 1 changed file with 23 additions and 47 deletions.
70 changes: 23 additions & 47 deletions cluster.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package main

import (
"context"
"log/slog"
"os"
"time"

"github.com/google/uuid"
"github.com/jacobbrewer1/workerpool"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -45,25 +44,33 @@ func (a *app) watchNewPods() {
)

for event := range watcher.ResultChan() {
wp.MustSchedule(newEventTask(a, event))
wp.MustSchedule(newEventTask(a, wp, event))
}

slog.Debug("Watcher channel closed")
wp.Stop()
}

type eventTask struct {
id string
a *app
wp *workerpool.WorkerPool
event watch.Event
}

func newEventTask(a *app, event watch.Event) *eventTask {
func newEventTask(a *app, wp *workerpool.WorkerPool, event watch.Event) *eventTask {
return &eventTask{
id: uuid.New().String(),
a: a,
wp: wp,
event: event,
}
}

func (t *eventTask) Run() {
l := slog.With(
slog.String("type", string(t.event.Type)),
slog.String("task_id", t.id),
slog.String("event_type", string(t.event.Type)),
)

pod, ok := t.event.Object.(*core.Pod)
Expand All @@ -73,44 +80,17 @@ func (t *eventTask) Run() {
}

switch t.event.Type {
case watch.Added:
// Wait until the pod is in a running state
//
// Do this over the course of 10 seconds
// with a 1-second delay between each check
case watch.Added, watch.Error:
// Is the pod still in the cluster? This is to prevent retry attempts from getting stuck
if pod.ObjectMeta.DeletionTimestamp != nil {
l.Info("Pod is being deleted, aborting")
return
}

// If the pod is not running, re-schedule the task
if pod.Status.Phase != core.PodRunning {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

found := false
for {
select {
case <-ctx.Done():
l.Warn("Pod did not enter running state in time")
return
default:
var err error
pod, err = t.a.client.CoreV1().Pods(pod.Namespace).Get(context.Background(), pod.Name, metav1.GetOptions{
ResourceVersion: pod.ResourceVersion,
})
if err != nil {
l.Error("Error getting pod", slog.String(loggingKeyError, err.Error()))
return
}

if pod.Status.Phase == core.PodRunning {
// Update the pod object to get the latest information
l.Debug("Pod is running", slog.String("pod", pod.Name))
found = true
break
}
time.Sleep(1 * time.Second)
}

if found {
break
}
}
t.wp.MustSchedule(t)
return
}

vc, err := newVaultClient(generateVaultAddress(pod.Spec.Containers[0].Ports, pod.Status.PodIP))
Expand Down Expand Up @@ -139,11 +119,7 @@ func (t *eventTask) Run() {
l.Error("Error unsealing vault", slog.String(loggingKeyError, err.Error()))
return
}
case watch.Modified:
// Do something
case watch.Deleted:
// Do something
case watch.Error:
case watch.Modified, watch.Deleted, watch.Bookmark:
// Do something
default:
l.Warn("Unknown event type", slog.String("type", string(t.event.Type)))
Expand Down

0 comments on commit 7c94bce

Please sign in to comment.