Skip to content

Commit

Permalink
Enhance service-pod synchronization with label-based matching
Browse files Browse the repository at this point in the history
- Add SyncServicePods function for managing service-pod synchronization
- Improve service and pod label matching logic using label selectors
- Update main function to utilize SyncServicePods for managing services
  • Loading branch information
jhoxhaa committed Dec 17, 2024
1 parent 36c5f7c commit 3eb4a45
Showing 1 changed file with 117 additions and 1 deletion.
118 changes: 117 additions & 1 deletion cmd/icinga-kubernetes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ import (
"github.com/spf13/pflag"
"golang.org/x/sync/errgroup"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
v2 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
kclientcmd "k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -293,6 +296,10 @@ func main() {
})
}

g.Go(func() error {
return SyncServicePods(ctx, db, factory.Core().V1().Services(), factory.Core().V1().Pods())
})

if cfg.Prometheus.Url != "" {
promClient, err := promapi.NewClient(promapi.Config{Address: cfg.Prometheus.Url})
if err != nil {
Expand Down Expand Up @@ -439,7 +446,10 @@ func main() {
f := schemav1.NewServiceFactory(clientset)
s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), f.NewService)

return s.Run(ctx)
return s.Run(
ctx,
syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().Services().UpsertEvents().In())),
)
})

g.Go(func() error {
Expand Down Expand Up @@ -562,3 +572,109 @@ func dbHasSchema(db *database.Database, dbName string) (bool, error) {

return rows.Next(), rows.Err()
}

func SyncServicePods(ctx context.Context, db *database.Database, serviceList v2.ServiceInformer, podList v2.PodInformer) error {
servicePods := make(chan any)

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return db.UpsertStreamed(ctx, servicePods)
})

g.Go(func() error {
ch := cachev1.Multiplexers().Pods().UpsertEvents().Out()
for {
select {
case pod, more := <-ch:
if !more {
return nil
}
services, err := serviceList.Lister().List(labels.NewSelector())
if err != nil {
return err
}

podLabelsv2 := make(labels.Set)
for _, label := range pod.(*schemav1.Pod).Labels {
podLabelsv2[label.Name] = label.Value
}

for _, service := range services {
labelSelector := &v1.LabelSelector{MatchLabels: service.Spec.Selector}
selector, err := v1.LabelSelectorAsSelector(labelSelector)
if err != nil {
return err
}

if selector.Matches(podLabelsv2) {
select {
case servicePods <- schemav1.ServicePod{
ServiceUuid: schemav1.EnsureUUID(service.UID),
PodUuid: pod.(*schemav1.Pod).Uuid,
}:
case <-ctx.Done():
return ctx.Err()
}
}
}

case <-ctx.Done():
return ctx.Err()
}
}
})

g.Go(func() error {
ch := cachev1.Multiplexers().Services().UpsertEvents().Out()
for {
select {
case service, more := <-ch:
if !more {
return nil
}

labelSelector := new(v1.LabelSelector)
labelSelector.MatchLabels = make(map[string]string)
for _, selector := range service.(*schemav1.Service).Selectors {
labelSelector.MatchLabels[selector.Name] = selector.Value
}

serviceSelectors := service.(*schemav1.Service).Selectors
serviceSelector := labels.NewSelector()
for _, selector := range serviceSelectors {
req, err := labels.NewRequirement(selector.Name, selection.Equals, []string{selector.Value})
if err != nil {
return err
}
serviceSelector = serviceSelector.Add(*req)
}

pods, _ := podList.Lister().List(serviceSelector)
for _, pod := range pods {
var matches int
for k, v := range pod.Labels {
for _, selector := range serviceSelectors {
if k == selector.Name && v == selector.Value {
matches++
}
}
}
if matches == len(pod.Labels) {
select {
case servicePods <- schemav1.ServicePod{
ServiceUuid: service.(*schemav1.Service).Uuid,
PodUuid: schemav1.EnsureUUID(pod.UID),
}:
case <-ctx.Done():
return ctx.Err()
}
}
}
case <-ctx.Done():
return ctx.Err()
}
}
})

return g.Wait()
}

0 comments on commit 3eb4a45

Please sign in to comment.