diff --git a/cluster.go b/cluster.go index 5a8cfda..4d744bc 100644 --- a/cluster.go +++ b/cluster.go @@ -1,11 +1,10 @@ package main import ( - "context" "log/slog" "os" - "time" + "github.com/google/uuid" "github.com/jacobbrewer1/workerpool" core "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -45,25 +44,33 @@ func (a *app) watchNewPods() { ) for event := range watcher.ResultChan() { - wp.MustSchedule(newEventTask(a, event)) + wp.MustSchedule(newEventTask(a, wp, event)) } + + slog.Debug("Watcher channel closed") + wp.Stop() } type eventTask struct { + id string a *app + wp *workerpool.WorkerPool event watch.Event } -func newEventTask(a *app, event watch.Event) *eventTask { +func newEventTask(a *app, wp *workerpool.WorkerPool, event watch.Event) *eventTask { return &eventTask{ + id: uuid.New().String(), a: a, + wp: wp, event: event, } } func (t *eventTask) Run() { l := slog.With( - slog.String("type", string(t.event.Type)), + slog.String("task_id", t.id), + slog.String("event_type", string(t.event.Type)), ) pod, ok := t.event.Object.(*core.Pod) @@ -73,44 +80,17 @@ func (t *eventTask) Run() { } switch t.event.Type { - case watch.Added: - // Wait until the pod is in a running state - // - // Do this over the course of 10 seconds - // with a 1-second delay between each check + case watch.Added, watch.Error: + // Is the pod still in the cluster? This is to prevent retry attempts from getting stuck + if pod.ObjectMeta.DeletionTimestamp != nil { + l.Info("Pod is being deleted, aborting") + return + } + + // If the pod is not running, re-schedule the task if pod.Status.Phase != core.PodRunning { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - found := false - for { - select { - case <-ctx.Done(): - l.Warn("Pod did not enter running state in time") - return - default: - var err error - pod, err = t.a.client.CoreV1().Pods(pod.Namespace).Get(context.Background(), pod.Name, metav1.GetOptions{ - ResourceVersion: pod.ResourceVersion, - }) - if err != nil { - l.Error("Error getting pod", slog.String(loggingKeyError, err.Error())) - return - } - - if pod.Status.Phase == core.PodRunning { - // Update the pod object to get the latest information - l.Debug("Pod is running", slog.String("pod", pod.Name)) - found = true - break - } - time.Sleep(1 * time.Second) - } - - if found { - break - } - } + t.wp.MustSchedule(t) + return } vc, err := newVaultClient(generateVaultAddress(pod.Spec.Containers[0].Ports, pod.Status.PodIP)) @@ -139,11 +119,7 @@ func (t *eventTask) Run() { l.Error("Error unsealing vault", slog.String(loggingKeyError, err.Error())) return } - case watch.Modified: - // Do something - case watch.Deleted: - // Do something - case watch.Error: + case watch.Modified, watch.Deleted, watch.Bookmark: // Do something default: l.Warn("Unknown event type", slog.String("type", string(t.event.Type)))