Skip to content

Commit

Permalink
Add support for cluster-specific warmup and sync operations
Browse files Browse the repository at this point in the history
  • Loading branch information
jhoxhaa committed Nov 25, 2024
1 parent e95e761 commit e3446b4
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 20 deletions.
33 changes: 17 additions & 16 deletions cmd/icinga-kubernetes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func main() {
g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Namespaces().Informer(), log.WithName("namespaces"), schemav1.NewNamespace, clusterUuid)

return s.Run(ctx)
return s.Run(ctx, schemav1.NewNamespace())
})

wg := sync.WaitGroup{}
Expand All @@ -326,7 +326,7 @@ func main() {

wg.Done()

return s.Run(ctx, forwardForNotifications...)
return s.Run(ctx, schemav1.NewNode(), forwardForNotifications...)
})

wg.Add(1)
Expand All @@ -346,6 +346,7 @@ func main() {

return s.Run(
ctx,
f.New(),
syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().Pods().UpsertEvents().In())),
syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().Pods().DeleteEvents().In())),
)
Expand All @@ -367,7 +368,7 @@ func main() {

wg.Done()

return s.Run(ctx, forwardForNotifications...)
return s.Run(ctx, schemav1.NewDeployment(), forwardForNotifications...)
})

wg.Add(1)
Expand All @@ -386,7 +387,7 @@ func main() {

wg.Done()

return s.Run(ctx, forwardForNotifications...)
return s.Run(ctx, schemav1.NewDaemonSet(), forwardForNotifications...)
})

wg.Add(1)
Expand All @@ -405,7 +406,7 @@ func main() {

wg.Done()

return s.Run(ctx, forwardForNotifications...)
return s.Run(ctx, schemav1.NewReplicaSet(), forwardForNotifications...)
})

wg.Add(1)
Expand All @@ -424,66 +425,66 @@ func main() {

wg.Done()

return s.Run(ctx, forwardForNotifications...)
return s.Run(ctx, schemav1.NewStatefulSet(), forwardForNotifications...)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService, clusterUuid)

return s.Run(ctx)
return s.Run(ctx, schemav1.NewService())
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Discovery().V1().EndpointSlices().Informer(), log.WithName("endpoints"), schemav1.NewEndpointSlice, clusterUuid)

return s.Run(ctx)
return s.Run(ctx, schemav1.NewEndpointSlice())
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Secrets().Informer(), log.WithName("secrets"), schemav1.NewSecret, clusterUuid)
return s.Run(ctx)
return s.Run(ctx, schemav1.NewSecret())
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().ConfigMaps().Informer(), log.WithName("config-maps"), schemav1.NewConfigMap, clusterUuid)

return s.Run(ctx)
return s.Run(ctx, schemav1.NewConfigMap())
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Events().V1().Events().Informer(), log.WithName("events"), schemav1.NewEvent, clusterUuid)

return s.Run(ctx, syncv1.WithNoDelete(), syncv1.WithNoWarumup())
return s.Run(ctx, schemav1.NewEvent(), syncv1.WithNoDelete(), syncv1.WithNoWarumup())
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumeClaims().Informer(), log.WithName("pvcs"), schemav1.NewPvc, clusterUuid)

return s.Run(ctx)
return s.Run(ctx, schemav1.NewPvc())
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumes().Informer(), log.WithName("persistent-volumes"), schemav1.NewPersistentVolume, clusterUuid)

return s.Run(ctx)
return s.Run(ctx, schemav1.NewPersistentVolume())
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Batch().V1().Jobs().Informer(), log.WithName("jobs"), schemav1.NewJob, clusterUuid)

return s.Run(ctx)
return s.Run(ctx, schemav1.NewJob())
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Batch().V1().CronJobs().Informer(), log.WithName("cron-jobs"), schemav1.NewCronJob, clusterUuid)

return s.Run(ctx)
return s.Run(ctx, schemav1.NewCronJob())
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Networking().V1().Ingresses().Informer(), log.WithName("ingresses"), schemav1.NewIngress, clusterUuid)

return s.Run(ctx)
return s.Run(ctx, schemav1.NewIngress())
})

g.Go(func() error {
Expand Down
13 changes: 9 additions & 4 deletions pkg/sync/v1/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/icinga/icinga-kubernetes/pkg/database"
schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1"
"golang.org/x/sync/errgroup"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
)
Expand Down Expand Up @@ -36,26 +37,30 @@ func NewSync(
}
}

func (s *Sync) Run(ctx context.Context, features ...Feature) error {
func (s *Sync) Run(ctx context.Context, k8s kmetav1.Object, features ...Feature) error {
controller := NewController(s.informer, s.log.WithName("controller"))

with := NewFeatures(features...)

if !with.NoWarmup() {
if err := s.warmup(ctx, controller); err != nil {
if err := s.warmup(ctx, controller, k8s); err != nil {
return err
}
}

return s.sync(ctx, controller, features...)
}

func (s *Sync) warmup(ctx context.Context, c *Controller) error {
func (s *Sync) warmup(ctx context.Context, c *Controller, k8s kmetav1.Object) error {
g, ctx := errgroup.WithContext(ctx)

meta := &schemav1.Meta{ClusterUuid: s.clusterUuid}
query := s.db.BuildSelectStmt(s.factory(), meta) + ` WHERE cluster_uuid=:cluster_uuid`

entities, errs := s.db.YieldAll(ctx, func() (interface{}, error) {
return s.factory(), nil
}, s.db.BuildSelectStmt(s.factory(), &schemav1.Meta{}))
}, query, meta)

// Let errors from YieldAll() cancel the group.
com.ErrgroupReceive(ctx, g, errs)

Expand Down

0 comments on commit e3446b4

Please sign in to comment.