diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index df58642..91fc818 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -13,6 +13,7 @@ import ( "github.com/icinga/icinga-kubernetes/internal" cachev1 "github.com/icinga/icinga-kubernetes/internal/cache/v1" "github.com/icinga/icinga-kubernetes/pkg/backoff" + "github.com/icinga/icinga-kubernetes/pkg/cluster" "github.com/icinga/icinga-kubernetes/pkg/com" "github.com/icinga/icinga-kubernetes/pkg/daemon" "github.com/icinga/icinga-kubernetes/pkg/database" @@ -40,10 +41,6 @@ import ( "time" ) -type clusterContextKeyType string - -const clusterContextKey clusterContextKeyType = "clusterContextKey" - const expectedSchemaVersion = "0.2.0" func main() { @@ -51,12 +48,14 @@ func main() { var configLocation string var showVersion bool + var clusterName string klog.InitFlags(nil) pflag.CommandLine.AddGoFlagSet(flag.CommandLine) pflag.BoolVar(&showVersion, "version", false, "print version and exit") pflag.StringVar(&configLocation, "config", "./config.yml", "path to the config file") + pflag.StringVar(&clusterName, "cluster-name", "", "name of the current cluster") loadingRules := kclientcmd.NewDefaultClientConfigLoadingRules() loadingRules.DefaultClientConfig = &kclientcmd.DefaultClientConfig @@ -123,16 +122,6 @@ func main() { g, ctx := errgroup.WithContext(context.Background()) - namespaceName := "kube-system" - ns, err := clientset.CoreV1().Namespaces().Get(ctx, namespaceName, v1.GetOptions{}) - if err != nil { - klog.Fatalf("Failed to retrieve namespace '%s': %v. Ensure the cluster is accessible and the namespace exists.", namespaceName, err) - } - - clusterUuid := schemav1.EnsureUUID(ns.UID) - - ctx = context.WithValue(ctx, clusterContextKey, clusterUuid) - if hasSchema { var version string @@ -214,7 +203,25 @@ func main() { klog.Fatal("IGL_DATABASE: ", err) } - if _, err := db.ExecContext(ctx, "DELETE FROM kubernetes_instance"); err != nil { + namespaceName := "kube-system" + ns, err := clientset.CoreV1().Namespaces().Get(context.TODO(), namespaceName, v1.GetOptions{}) + if err != nil { + klog.Fatalf("Failed to retrieve namespace '%s' for cluster '%s': %v", namespaceName, clusterName, err) + } + + clusterInstance := &schemav1.Cluster{ + Uuid: schemav1.EnsureUUID(ns.UID), + Name: clusterName, + } + + ctx = cluster.NewClusterUuidContext(ctx, clusterInstance.Uuid) + + stmt, _ := db.BuildUpsertStmt(clusterInstance) + if _, err := db.NamedExecContext(ctx, stmt, clusterInstance); err != nil { + klog.Error(errors.Wrap(err, "can't update cluster")) + } + + if _, err := db.ExecContext(ctx, "DELETE FROM kubernetes_instance WHERE cluster_uuid = ?", clusterInstance.Uuid); err != nil { klog.Fatal(errors.Wrap(err, "can't delete instance")) } // ,omitempty @@ -230,6 +237,7 @@ func main() { instance := schemav1.Instance{ Uuid: instanceId[:], + ClusterUuid: clusterInstance.Uuid, Version: internal.Version.Version, KubernetesVersion: schemav1.NewNullableString(kubernetesVersion), KubernetesHeartbeat: types.UnixMilli(kubernetesHeartbeat), @@ -248,7 +256,7 @@ func main() { } }, periodic.Immediate()).Stop() - if err := internal.SyncNotificationsConfig(ctx, db2, &cfg.Notifications); err != nil { + if err := internal.SyncNotificationsConfig(ctx, db2, &cfg.Notifications, clusterInstance.Uuid); err != nil { klog.Fatal(err) } @@ -304,7 +312,7 @@ func main() { } g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().Namespaces().Informer(), log.WithName("namespaces"), schemav1.NewNamespace, clusterUuid) + s := syncv1.NewSync(db, factory.Core().V1().Namespaces().Informer(), log.WithName("namespaces"), schemav1.NewNamespace) return s.Run(ctx) }) @@ -313,7 +321,7 @@ func main() { wg.Add(1) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode, clusterUuid) + s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { @@ -340,7 +348,7 @@ func main() { ) f := schemav1.NewPodFactory(clientset) - s := syncv1.NewSync(db, factory.Core().V1().Pods().Informer(), log.WithName("pods"), f.New, clusterUuid) + s := syncv1.NewSync(db, factory.Core().V1().Pods().Informer(), log.WithName("pods"), f.New) wg.Done() @@ -354,7 +362,7 @@ func main() { wg.Add(1) g.Go(func() error { s := syncv1.NewSync( - db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment, clusterUuid) + db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { @@ -373,7 +381,7 @@ func main() { wg.Add(1) g.Go(func() error { s := syncv1.NewSync( - db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet, clusterUuid) + db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { @@ -392,7 +400,7 @@ func main() { wg.Add(1) g.Go(func() error { s := syncv1.NewSync( - db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet, clusterUuid) + db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { @@ -411,7 +419,7 @@ func main() { wg.Add(1) g.Go(func() error { s := syncv1.NewSync( - db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet, clusterUuid) + db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { @@ -428,60 +436,60 @@ func main() { }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService, clusterUuid) + s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Discovery().V1().EndpointSlices().Informer(), log.WithName("endpoints"), schemav1.NewEndpointSlice, clusterUuid) + s := syncv1.NewSync(db, factory.Discovery().V1().EndpointSlices().Informer(), log.WithName("endpoints"), schemav1.NewEndpointSlice) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().Secrets().Informer(), log.WithName("secrets"), schemav1.NewSecret, clusterUuid) + s := syncv1.NewSync(db, factory.Core().V1().Secrets().Informer(), log.WithName("secrets"), schemav1.NewSecret) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().ConfigMaps().Informer(), log.WithName("config-maps"), schemav1.NewConfigMap, clusterUuid) + s := syncv1.NewSync(db, factory.Core().V1().ConfigMaps().Informer(), log.WithName("config-maps"), schemav1.NewConfigMap) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Events().V1().Events().Informer(), log.WithName("events"), schemav1.NewEvent, clusterUuid) + s := syncv1.NewSync(db, factory.Events().V1().Events().Informer(), log.WithName("events"), schemav1.NewEvent) return s.Run(ctx, syncv1.WithNoDelete(), syncv1.WithNoWarumup()) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumeClaims().Informer(), log.WithName("pvcs"), schemav1.NewPvc, clusterUuid) + s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumeClaims().Informer(), log.WithName("pvcs"), schemav1.NewPvc) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumes().Informer(), log.WithName("persistent-volumes"), schemav1.NewPersistentVolume, clusterUuid) + s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumes().Informer(), log.WithName("persistent-volumes"), schemav1.NewPersistentVolume) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Batch().V1().Jobs().Informer(), log.WithName("jobs"), schemav1.NewJob, clusterUuid) + s := syncv1.NewSync(db, factory.Batch().V1().Jobs().Informer(), log.WithName("jobs"), schemav1.NewJob) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Batch().V1().CronJobs().Informer(), log.WithName("cron-jobs"), schemav1.NewCronJob, clusterUuid) + s := syncv1.NewSync(db, factory.Batch().V1().CronJobs().Informer(), log.WithName("cron-jobs"), schemav1.NewCronJob) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Networking().V1().Ingresses().Informer(), log.WithName("ingresses"), schemav1.NewIngress, clusterUuid) + s := syncv1.NewSync(db, factory.Networking().V1().Ingresses().Informer(), log.WithName("ingresses"), schemav1.NewIngress) return s.Run(ctx) }) diff --git a/internal/notifications.go b/internal/notifications.go index 1ad9570..7e55144 100644 --- a/internal/notifications.go +++ b/internal/notifications.go @@ -11,22 +11,23 @@ import ( "github.com/pkg/errors" ) -func SyncNotificationsConfig(ctx context.Context, db *database.DB, config *notifications.Config) error { +func SyncNotificationsConfig(ctx context.Context, db *database.DB, config *notifications.Config, clusterUuid types.UUID) error { _true := types.Bool{Bool: true, Valid: true} if config.Url != "" { toDb := []schemav1.Config{ - {Key: schemav1.ConfigKeyNotificationsUrl, Value: config.Url, Locked: _true}, - {Key: schemav1.ConfigKeyNotificationsUsername, Value: config.Username, Locked: _true}, - {Key: schemav1.ConfigKeyNotificationsPassword, Value: config.Password, Locked: _true}, + {ClusterUuid: clusterUuid, Key: schemav1.ConfigKeyNotificationsUrl, Value: config.Url, Locked: _true}, + {ClusterUuid: clusterUuid, Key: schemav1.ConfigKeyNotificationsUsername, Value: config.Username, Locked: _true}, + {ClusterUuid: clusterUuid, Key: schemav1.ConfigKeyNotificationsPassword, Value: config.Password, Locked: _true}, } err := db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { if kwebUrl := config.KubernetesWebUrl; kwebUrl != "" { toDb = append(toDb, schemav1.Config{ - Key: schemav1.ConfigKeyNotificationsKubernetesWebUrl, - Value: kwebUrl, - Locked: _true, + ClusterUuid: clusterUuid, + Key: schemav1.ConfigKeyNotificationsKubernetesWebUrl, + Value: kwebUrl, + Locked: _true, }) } else { if err := tx.SelectContext(ctx, &config.KubernetesWebUrl, fmt.Sprintf( @@ -41,7 +42,7 @@ func SyncNotificationsConfig(ctx context.Context, db *database.DB, config *notif if _, err := tx.ExecContext( ctx, fmt.Sprintf( - `DELETE FROM "%s" WHERE "key" LIKE ? AND "locked" = ?`, + `DELETE FROM "%s" WHERE "cluster_uuid" = ? AND "key" LIKE ? AND "locked" = ?`, database.TableName(&schemav1.Config{}), ), `notifications.%`, @@ -65,7 +66,7 @@ func SyncNotificationsConfig(ctx context.Context, db *database.DB, config *notif if _, err := tx.ExecContext( ctx, fmt.Sprintf( - `DELETE FROM "%s" WHERE "key" LIKE ? AND "locked" = ?`, + `DELETE FROM "%s" WHERE "cluster_uuid" = ? AND "key" LIKE ? AND "locked" = ?`, database.TableName(&schemav1.Config{}), ), `notifications.%`, diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go new file mode 100644 index 0000000..9b0aad3 --- /dev/null +++ b/pkg/cluster/cluster.go @@ -0,0 +1,34 @@ +package cluster + +import ( + "context" + "github.com/icinga/icinga-go-library/types" +) + +// Private type to prevent collisions with other context keys +type contextKey string + +// clusterUuidContextKey is the key for Cluster values in contexts. +var clusterUuidContextKey = contextKey("cluster_uuid") + +// NewClusterUuidContext creates a new context that carries the provided cluster UUID. +// The new context is derived from the given parent context and associates the cluster UUID +// with a predefined key (clusterContextKey). +func NewClusterUuidContext(parent context.Context, clusterUuid types.UUID) context.Context { + return context.WithValue(parent, clusterUuidContextKey, clusterUuid) +} + +// ClusterUuidFromContext returns the uuid value of the cluster stored in ctx, if any: +// +// clusterUuid, ok := ClusterUuidFromContext(ctx) +// if !ok { +// // Error handling. +// } +func ClusterUuidFromContext(ctx context.Context) types.UUID { + clusterUuid, ok := ctx.Value(clusterUuidContextKey).(types.UUID) + if !ok { + panic("cluster not found in context") + } + + return clusterUuid +} diff --git a/pkg/schema/v1/cluster.go b/pkg/schema/v1/cluster.go new file mode 100644 index 0000000..0c74679 --- /dev/null +++ b/pkg/schema/v1/cluster.go @@ -0,0 +1,10 @@ +package v1 + +import ( + "github.com/icinga/icinga-go-library/types" +) + +type Cluster struct { + Uuid types.UUID + Name string +} diff --git a/pkg/schema/v1/config.go b/pkg/schema/v1/config.go index 5a6f853..09dcac6 100644 --- a/pkg/schema/v1/config.go +++ b/pkg/schema/v1/config.go @@ -4,9 +4,10 @@ import "github.com/icinga/icinga-go-library/types" // Config represents a single key => value pair database config entry. type Config struct { - Key ConfigKey - Value string - Locked types.Bool + ClusterUuid types.UUID + Key ConfigKey + Value string + Locked types.Bool } // ConfigKey represents the database config.Key enums. diff --git a/pkg/schema/v1/instance.go b/pkg/schema/v1/instance.go index 2519047..a917868 100644 --- a/pkg/schema/v1/instance.go +++ b/pkg/schema/v1/instance.go @@ -7,6 +7,7 @@ import ( type Instance struct { Uuid types.Binary + ClusterUuid types.UUID Version string KubernetesVersion sql.NullString KubernetesHeartbeat types.UnixMilli diff --git a/pkg/sync/v1/sync.go b/pkg/sync/v1/sync.go index e2ad679..0157c9d 100644 --- a/pkg/sync/v1/sync.go +++ b/pkg/sync/v1/sync.go @@ -3,7 +3,7 @@ package v1 import ( "context" "github.com/go-logr/logr" - "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-kubernetes/pkg/cluster" "github.com/icinga/icinga-kubernetes/pkg/com" "github.com/icinga/icinga-kubernetes/pkg/database" schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1" @@ -13,11 +13,10 @@ import ( ) type Sync struct { - db *database.Database - informer cache.SharedIndexInformer - log logr.Logger - factory func() schemav1.Resource - clusterUuid types.UUID + db *database.Database + informer cache.SharedIndexInformer + log logr.Logger + factory func() schemav1.Resource } func NewSync( @@ -25,14 +24,12 @@ func NewSync( informer cache.SharedIndexInformer, log logr.Logger, factory func() schemav1.Resource, - clusterUuid types.UUID, ) *Sync { return &Sync{ - db: db, - informer: informer, - log: log, - factory: factory, - clusterUuid: clusterUuid, + db: db, + informer: informer, + log: log, + factory: factory, } } @@ -53,7 +50,7 @@ func (s *Sync) Run(ctx context.Context, features ...Feature) error { func (s *Sync) warmup(ctx context.Context, c *Controller) error { g, ctx := errgroup.WithContext(ctx) - meta := &schemav1.Meta{ClusterUuid: s.clusterUuid} + meta := &schemav1.Meta{ClusterUuid: cluster.ClusterUuidFromContext(ctx)} query := s.db.BuildSelectStmt(s.factory(), meta) + ` WHERE cluster_uuid=:cluster_uuid` entities, errs := s.db.YieldAll(ctx, func() (interface{}, error) { @@ -88,7 +85,7 @@ func (s *Sync) warmup(ctx context.Context, c *Controller) error { func (s *Sync) sync(ctx context.Context, c *Controller, features ...Feature) error { sink := NewSink(func(i *Item) interface{} { entity := s.factory() - entity.Obtain(*i.Item, s.clusterUuid) + entity.Obtain(*i.Item, cluster.ClusterUuidFromContext(ctx)) return entity }, func(k interface{}) interface{} {