From 3eb4a45b77a815da8fe5234447a2afe51ea715f7 Mon Sep 17 00:00:00 2001 From: Jonada Hoxha Date: Tue, 17 Dec 2024 15:19:00 +0100 Subject: [PATCH] Enhance `service-pod` synchronization with `label-based` matching - Add SyncServicePods function for managing service-pod synchronization - Improve service and pod label matching logic using label selectors - Update main function to utilize SyncServicePods for managing services --- cmd/icinga-kubernetes/main.go | 118 +++++++++++++++++++++++++++++++++- 1 file changed, 117 insertions(+), 1 deletion(-) diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index e1eb1f5..626257b 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -30,8 +30,11 @@ import ( "github.com/spf13/pflag" "golang.org/x/sync/errgroup" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/informers" + v2 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" kclientcmd "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" @@ -293,6 +296,10 @@ func main() { }) } + g.Go(func() error { + return SyncServicePods(ctx, db, factory.Core().V1().Services(), factory.Core().V1().Pods()) + }) + if cfg.Prometheus.Url != "" { promClient, err := promapi.NewClient(promapi.Config{Address: cfg.Prometheus.Url}) if err != nil { @@ -439,7 +446,10 @@ func main() { f := schemav1.NewServiceFactory(clientset) s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), f.NewService) - return s.Run(ctx) + return s.Run( + ctx, + syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().Services().UpsertEvents().In())), + ) }) g.Go(func() error { @@ -562,3 +572,109 @@ func dbHasSchema(db *database.Database, dbName string) (bool, error) { return rows.Next(), rows.Err() } + +func SyncServicePods(ctx context.Context, db *database.Database, serviceList v2.ServiceInformer, podList v2.PodInformer) error { + servicePods := make(chan any) + + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + return db.UpsertStreamed(ctx, servicePods) + }) + + g.Go(func() error { + ch := cachev1.Multiplexers().Pods().UpsertEvents().Out() + for { + select { + case pod, more := <-ch: + if !more { + return nil + } + services, err := serviceList.Lister().List(labels.NewSelector()) + if err != nil { + return err + } + + podLabelsv2 := make(labels.Set) + for _, label := range pod.(*schemav1.Pod).Labels { + podLabelsv2[label.Name] = label.Value + } + + for _, service := range services { + labelSelector := &v1.LabelSelector{MatchLabels: service.Spec.Selector} + selector, err := v1.LabelSelectorAsSelector(labelSelector) + if err != nil { + return err + } + + if selector.Matches(podLabelsv2) { + select { + case servicePods <- schemav1.ServicePod{ + ServiceUuid: schemav1.EnsureUUID(service.UID), + PodUuid: pod.(*schemav1.Pod).Uuid, + }: + case <-ctx.Done(): + return ctx.Err() + } + } + } + + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + g.Go(func() error { + ch := cachev1.Multiplexers().Services().UpsertEvents().Out() + for { + select { + case service, more := <-ch: + if !more { + return nil + } + + labelSelector := new(v1.LabelSelector) + labelSelector.MatchLabels = make(map[string]string) + for _, selector := range service.(*schemav1.Service).Selectors { + labelSelector.MatchLabels[selector.Name] = selector.Value + } + + serviceSelectors := service.(*schemav1.Service).Selectors + serviceSelector := labels.NewSelector() + for _, selector := range serviceSelectors { + req, err := labels.NewRequirement(selector.Name, selection.Equals, []string{selector.Value}) + if err != nil { + return err + } + serviceSelector = serviceSelector.Add(*req) + } + + pods, _ := podList.Lister().List(serviceSelector) + for _, pod := range pods { + var matches int + for k, v := range pod.Labels { + for _, selector := range serviceSelectors { + if k == selector.Name && v == selector.Value { + matches++ + } + } + } + if matches == len(pod.Labels) { + select { + case servicePods <- schemav1.ServicePod{ + ServiceUuid: service.(*schemav1.Service).Uuid, + PodUuid: schemav1.EnsureUUID(pod.UID), + }: + case <-ctx.Done(): + return ctx.Err() + } + } + } + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + return g.Wait() +}