diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index 3f8954b..246f343 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -8,12 +8,12 @@ import ( "github.com/icinga/icinga-go-library/config" igldatabase "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-go-library/periodic" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-kubernetes/internal" "github.com/icinga/icinga-kubernetes/pkg/com" "github.com/icinga/icinga-kubernetes/pkg/database" "github.com/icinga/icinga-kubernetes/pkg/metrics" - "github.com/icinga/icinga-kubernetes/pkg/periodic" schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1" "github.com/icinga/icinga-kubernetes/pkg/sync" syncv1 "github.com/icinga/icinga-kubernetes/pkg/sync/v1" @@ -269,28 +269,45 @@ func main() { return s.Run(ctx) }) - errs := make(chan error, 1) - defer close(errs) - defer periodic.Start(ctx, time.Hour, func(tick periodic.Tick) { - olderThan := tick.Time.AddDate(0, 0, -1) - - _, err := db.CleanupOlderThan( - ctx, database.CleanupStmt{ - Table: "event", - PK: "uuid", - Column: "created", - }, 5000, olderThan, - ) - if err != nil { - select { - case errs <- err: - case <-ctx.Done(): - } + g.Go(func() error { + return db.PeriodicCleanup(ctx, database.CleanupStmt{ + Table: "event", + PK: "uuid", + Column: "created", + }) + }) - return - } - }, periodic.Immediate()).Stop() - com.ErrgroupReceive(ctx, g, errs) + g.Go(func() error { + return db.PeriodicCleanup(ctx, database.CleanupStmt{ + Table: "prometheus_cluster_metric", + PK: "(cluster_uuid, timestamp, category, name)", + Column: "timestamp", + }) + }) + + g.Go(func() error { + return db.PeriodicCleanup(ctx, database.CleanupStmt{ + Table: "prometheus_node_metric", + PK: "(node_uuid, timestamp, category, name)", + Column: "timestamp", + }) + }) + + g.Go(func() error { + return db.PeriodicCleanup(ctx, database.CleanupStmt{ + Table: "prometheus_pod_metric", + PK: "(pod_uuid, timestamp, category, name)", + Column: "timestamp", + }) + }) + + g.Go(func() error { + return db.PeriodicCleanup(ctx, database.CleanupStmt{ + Table: "prometheus_container_metric", + PK: "(container_uuid, timestamp, category, name)", + Column: "timestamp", + }) + }) if err := g.Wait(); err != nil { klog.Fatal(err) diff --git a/pkg/database/cleanup.go b/pkg/database/cleanup.go index 442da0f..8194c78 100644 --- a/pkg/database/cleanup.go +++ b/pkg/database/cleanup.go @@ -7,6 +7,8 @@ import ( "github.com/icinga/icinga-go-library/retry" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-kubernetes/pkg/com" + "github.com/icinga/icinga-kubernetes/pkg/periodic" + "golang.org/x/sync/errgroup" "time" ) @@ -103,3 +105,31 @@ func (db *Database) CleanupOlderThan( type cleanupWhere struct { Time types.UnixMilli } + +func (db *Database) PeriodicCleanup(ctx context.Context, stmt CleanupStmt) error { + g, ctxCleanup := errgroup.WithContext(ctx) + + errs := make(chan error, 1) + defer close(errs) + + periodic.Start(ctx, time.Hour, func(tick periodic.Tick) { + olderThan := tick.Time.AddDate(0, 0, -1) + + _, err := db.CleanupOlderThan( + ctx, stmt, 5000, olderThan, + ) + + if err != nil { + select { + case errs <- err: + case <-ctx.Done(): + } + + return + } + }, periodic.Immediate()).Stop() + + com.ErrgroupReceive(ctxCleanup, g, errs) + + return g.Wait() +}