diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index df58642..d2d589e 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -306,7 +306,7 @@ func main() { g.Go(func() error { s := syncv1.NewSync(db, factory.Core().V1().Namespaces().Informer(), log.WithName("namespaces"), schemav1.NewNamespace, clusterUuid) - return s.Run(ctx) + return s.Run(ctx, schemav1.NewNamespace()) }) wg := sync.WaitGroup{} @@ -326,7 +326,7 @@ func main() { wg.Done() - return s.Run(ctx, forwardForNotifications...) + return s.Run(ctx, schemav1.NewNode(), forwardForNotifications...) }) wg.Add(1) @@ -346,6 +346,7 @@ func main() { return s.Run( ctx, + f.New(), syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().Pods().UpsertEvents().In())), syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().Pods().DeleteEvents().In())), ) @@ -367,7 +368,7 @@ func main() { wg.Done() - return s.Run(ctx, forwardForNotifications...) + return s.Run(ctx, schemav1.NewDeployment(), forwardForNotifications...) }) wg.Add(1) @@ -386,7 +387,7 @@ func main() { wg.Done() - return s.Run(ctx, forwardForNotifications...) + return s.Run(ctx, schemav1.NewDaemonSet(), forwardForNotifications...) }) wg.Add(1) @@ -405,7 +406,7 @@ func main() { wg.Done() - return s.Run(ctx, forwardForNotifications...) + return s.Run(ctx, schemav1.NewReplicaSet(), forwardForNotifications...) }) wg.Add(1) @@ -424,66 +425,66 @@ func main() { wg.Done() - return s.Run(ctx, forwardForNotifications...) + return s.Run(ctx, schemav1.NewStatefulSet(), forwardForNotifications...) }) g.Go(func() error { s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService, clusterUuid) - return s.Run(ctx) + return s.Run(ctx, schemav1.NewService()) }) g.Go(func() error { s := syncv1.NewSync(db, factory.Discovery().V1().EndpointSlices().Informer(), log.WithName("endpoints"), schemav1.NewEndpointSlice, clusterUuid) - return s.Run(ctx) + return s.Run(ctx, schemav1.NewEndpointSlice()) }) g.Go(func() error { s := syncv1.NewSync(db, factory.Core().V1().Secrets().Informer(), log.WithName("secrets"), schemav1.NewSecret, clusterUuid) - return s.Run(ctx) + return s.Run(ctx, schemav1.NewSecret()) }) g.Go(func() error { s := syncv1.NewSync(db, factory.Core().V1().ConfigMaps().Informer(), log.WithName("config-maps"), schemav1.NewConfigMap, clusterUuid) - return s.Run(ctx) + return s.Run(ctx, schemav1.NewConfigMap()) }) g.Go(func() error { s := syncv1.NewSync(db, factory.Events().V1().Events().Informer(), log.WithName("events"), schemav1.NewEvent, clusterUuid) - return s.Run(ctx, syncv1.WithNoDelete(), syncv1.WithNoWarumup()) + return s.Run(ctx, schemav1.NewEvent(), syncv1.WithNoDelete(), syncv1.WithNoWarumup()) }) g.Go(func() error { s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumeClaims().Informer(), log.WithName("pvcs"), schemav1.NewPvc, clusterUuid) - return s.Run(ctx) + return s.Run(ctx, schemav1.NewPvc()) }) g.Go(func() error { s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumes().Informer(), log.WithName("persistent-volumes"), schemav1.NewPersistentVolume, clusterUuid) - return s.Run(ctx) + return s.Run(ctx, schemav1.NewPersistentVolume()) }) g.Go(func() error { s := syncv1.NewSync(db, factory.Batch().V1().Jobs().Informer(), log.WithName("jobs"), schemav1.NewJob, clusterUuid) - return s.Run(ctx) + return s.Run(ctx, schemav1.NewJob()) }) g.Go(func() error { s := syncv1.NewSync(db, factory.Batch().V1().CronJobs().Informer(), log.WithName("cron-jobs"), schemav1.NewCronJob, clusterUuid) - return s.Run(ctx) + return s.Run(ctx, schemav1.NewCronJob()) }) g.Go(func() error { s := syncv1.NewSync(db, factory.Networking().V1().Ingresses().Informer(), log.WithName("ingresses"), schemav1.NewIngress, clusterUuid) - return s.Run(ctx) + return s.Run(ctx, schemav1.NewIngress()) }) g.Go(func() error { diff --git a/pkg/sync/v1/sync.go b/pkg/sync/v1/sync.go index d84e432..2318914 100644 --- a/pkg/sync/v1/sync.go +++ b/pkg/sync/v1/sync.go @@ -8,6 +8,7 @@ import ( "github.com/icinga/icinga-kubernetes/pkg/database" schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1" "golang.org/x/sync/errgroup" + kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" ) @@ -36,13 +37,13 @@ func NewSync( } } -func (s *Sync) Run(ctx context.Context, features ...Feature) error { +func (s *Sync) Run(ctx context.Context, k8s kmetav1.Object, features ...Feature) error { controller := NewController(s.informer, s.log.WithName("controller")) with := NewFeatures(features...) if !with.NoWarmup() { - if err := s.warmup(ctx, controller); err != nil { + if err := s.warmup(ctx, controller, k8s); err != nil { return err } } @@ -50,12 +51,16 @@ func (s *Sync) Run(ctx context.Context, features ...Feature) error { return s.sync(ctx, controller, features...) } -func (s *Sync) warmup(ctx context.Context, c *Controller) error { +func (s *Sync) warmup(ctx context.Context, c *Controller, k8s kmetav1.Object) error { g, ctx := errgroup.WithContext(ctx) + meta := &schemav1.Meta{ClusterUuid: s.clusterUuid} + query := s.db.BuildSelectStmt(s.factory(), meta) + ` WHERE cluster_uuid=:cluster_uuid` + entities, errs := s.db.YieldAll(ctx, func() (interface{}, error) { return s.factory(), nil - }, s.db.BuildSelectStmt(s.factory(), &schemav1.Meta{})) + }, query, meta) + // Let errors from YieldAll() cancel the group. com.ErrgroupReceive(ctx, g, errs)