Skip to content

Commit

Permalink
Sync clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
jhoxhaa committed Dec 2, 2024
1 parent d4d7100 commit 07d1f90
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 59 deletions.
74 changes: 41 additions & 33 deletions cmd/icinga-kubernetes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/icinga/icinga-kubernetes/internal"
cachev1 "github.com/icinga/icinga-kubernetes/internal/cache/v1"
"github.com/icinga/icinga-kubernetes/pkg/backoff"
"github.com/icinga/icinga-kubernetes/pkg/cluster"
"github.com/icinga/icinga-kubernetes/pkg/com"
"github.com/icinga/icinga-kubernetes/pkg/daemon"
"github.com/icinga/icinga-kubernetes/pkg/database"
Expand Down Expand Up @@ -40,23 +41,21 @@ import (
"time"
)

type clusterContextKeyType string

const clusterContextKey clusterContextKeyType = "clusterContextKey"

const expectedSchemaVersion = "0.2.0"

func main() {
runtime.ReallyCrash = true

var configLocation string
var showVersion bool
var clusterName string

klog.InitFlags(nil)
pflag.CommandLine.AddGoFlagSet(flag.CommandLine)

pflag.BoolVar(&showVersion, "version", false, "print version and exit")
pflag.StringVar(&configLocation, "config", "./config.yml", "path to the config file")
pflag.StringVar(&clusterName, "cluster-name", "", "name of the current cluster")

loadingRules := kclientcmd.NewDefaultClientConfigLoadingRules()
loadingRules.DefaultClientConfig = &kclientcmd.DefaultClientConfig
Expand Down Expand Up @@ -123,16 +122,6 @@ func main() {

g, ctx := errgroup.WithContext(context.Background())

namespaceName := "kube-system"
ns, err := clientset.CoreV1().Namespaces().Get(ctx, namespaceName, v1.GetOptions{})
if err != nil {
klog.Fatalf("Failed to retrieve namespace '%s': %v. Ensure the cluster is accessible and the namespace exists.", namespaceName, err)
}

clusterUuid := schemav1.EnsureUUID(ns.UID)

ctx = context.WithValue(ctx, clusterContextKey, clusterUuid)

if hasSchema {
var version string

Expand Down Expand Up @@ -214,7 +203,25 @@ func main() {
klog.Fatal("IGL_DATABASE: ", err)
}

if _, err := db.ExecContext(ctx, "DELETE FROM kubernetes_instance"); err != nil {
namespaceName := "kube-system"
ns, err := clientset.CoreV1().Namespaces().Get(context.TODO(), namespaceName, v1.GetOptions{})
if err != nil {
klog.Fatalf("Failed to retrieve namespace '%s' for cluster '%s': %v", namespaceName, clusterName, err)
}

clusterInstance := &schemav1.Cluster{
Uuid: schemav1.EnsureUUID(ns.UID),
Name: clusterName,
}

ctx = cluster.NewClusterUuidContext(ctx, clusterInstance.Uuid)

stmt, _ := db.BuildUpsertStmt(clusterInstance)
if _, err := db.NamedExecContext(ctx, stmt, clusterInstance); err != nil {
klog.Error(errors.Wrap(err, "can't update cluster"))
}

if _, err := db.ExecContext(ctx, "DELETE FROM kubernetes_instance WHERE cluster_uuid = ?", clusterInstance.Uuid); err != nil {
klog.Fatal(errors.Wrap(err, "can't delete instance"))
}
// ,omitempty
Expand All @@ -230,6 +237,7 @@ func main() {

instance := schemav1.Instance{
Uuid: instanceId[:],
ClusterUuid: clusterInstance.Uuid,
Version: internal.Version.Version,
KubernetesVersion: schemav1.NewNullableString(kubernetesVersion),
KubernetesHeartbeat: types.UnixMilli(kubernetesHeartbeat),
Expand All @@ -248,7 +256,7 @@ func main() {
}
}, periodic.Immediate()).Stop()

if err := internal.SyncNotificationsConfig(ctx, db2, &cfg.Notifications); err != nil {
if err := internal.SyncNotificationsConfig(ctx, db2, &cfg.Notifications, clusterInstance.Uuid); err != nil {
klog.Fatal(err)
}

Expand Down Expand Up @@ -304,7 +312,7 @@ func main() {
}

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Namespaces().Informer(), log.WithName("namespaces"), schemav1.NewNamespace, clusterUuid)
s := syncv1.NewSync(db, factory.Core().V1().Namespaces().Informer(), log.WithName("namespaces"), schemav1.NewNamespace)

return s.Run(ctx)
})
Expand All @@ -313,7 +321,7 @@ func main() {

wg.Add(1)
g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode, clusterUuid)
s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
Expand All @@ -340,7 +348,7 @@ func main() {
)

f := schemav1.NewPodFactory(clientset)
s := syncv1.NewSync(db, factory.Core().V1().Pods().Informer(), log.WithName("pods"), f.New, clusterUuid)
s := syncv1.NewSync(db, factory.Core().V1().Pods().Informer(), log.WithName("pods"), f.New)

wg.Done()

Expand All @@ -354,7 +362,7 @@ func main() {
wg.Add(1)
g.Go(func() error {
s := syncv1.NewSync(
db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment, clusterUuid)
db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
Expand All @@ -373,7 +381,7 @@ func main() {
wg.Add(1)
g.Go(func() error {
s := syncv1.NewSync(
db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet, clusterUuid)
db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
Expand All @@ -392,7 +400,7 @@ func main() {
wg.Add(1)
g.Go(func() error {
s := syncv1.NewSync(
db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet, clusterUuid)
db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
Expand All @@ -411,7 +419,7 @@ func main() {
wg.Add(1)
g.Go(func() error {
s := syncv1.NewSync(
db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet, clusterUuid)
db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
Expand All @@ -428,60 +436,60 @@ func main() {
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService, clusterUuid)
s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Discovery().V1().EndpointSlices().Informer(), log.WithName("endpoints"), schemav1.NewEndpointSlice, clusterUuid)
s := syncv1.NewSync(db, factory.Discovery().V1().EndpointSlices().Informer(), log.WithName("endpoints"), schemav1.NewEndpointSlice)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Secrets().Informer(), log.WithName("secrets"), schemav1.NewSecret, clusterUuid)
s := syncv1.NewSync(db, factory.Core().V1().Secrets().Informer(), log.WithName("secrets"), schemav1.NewSecret)
return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().ConfigMaps().Informer(), log.WithName("config-maps"), schemav1.NewConfigMap, clusterUuid)
s := syncv1.NewSync(db, factory.Core().V1().ConfigMaps().Informer(), log.WithName("config-maps"), schemav1.NewConfigMap)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Events().V1().Events().Informer(), log.WithName("events"), schemav1.NewEvent, clusterUuid)
s := syncv1.NewSync(db, factory.Events().V1().Events().Informer(), log.WithName("events"), schemav1.NewEvent)

return s.Run(ctx, syncv1.WithNoDelete(), syncv1.WithNoWarumup())
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumeClaims().Informer(), log.WithName("pvcs"), schemav1.NewPvc, clusterUuid)
s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumeClaims().Informer(), log.WithName("pvcs"), schemav1.NewPvc)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumes().Informer(), log.WithName("persistent-volumes"), schemav1.NewPersistentVolume, clusterUuid)
s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumes().Informer(), log.WithName("persistent-volumes"), schemav1.NewPersistentVolume)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Batch().V1().Jobs().Informer(), log.WithName("jobs"), schemav1.NewJob, clusterUuid)
s := syncv1.NewSync(db, factory.Batch().V1().Jobs().Informer(), log.WithName("jobs"), schemav1.NewJob)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Batch().V1().CronJobs().Informer(), log.WithName("cron-jobs"), schemav1.NewCronJob, clusterUuid)
s := syncv1.NewSync(db, factory.Batch().V1().CronJobs().Informer(), log.WithName("cron-jobs"), schemav1.NewCronJob)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Networking().V1().Ingresses().Informer(), log.WithName("ingresses"), schemav1.NewIngress, clusterUuid)
s := syncv1.NewSync(db, factory.Networking().V1().Ingresses().Informer(), log.WithName("ingresses"), schemav1.NewIngress)

return s.Run(ctx)
})
Expand Down
19 changes: 10 additions & 9 deletions internal/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,23 @@ import (
"github.com/pkg/errors"
)

func SyncNotificationsConfig(ctx context.Context, db *database.DB, config *notifications.Config) error {
func SyncNotificationsConfig(ctx context.Context, db *database.DB, config *notifications.Config, clusterUuid types.UUID) error {
_true := types.Bool{Bool: true, Valid: true}

if config.Url != "" {
toDb := []schemav1.Config{
{Key: schemav1.ConfigKeyNotificationsUrl, Value: config.Url, Locked: _true},
{Key: schemav1.ConfigKeyNotificationsUsername, Value: config.Username, Locked: _true},
{Key: schemav1.ConfigKeyNotificationsPassword, Value: config.Password, Locked: _true},
{ClusterUuid: clusterUuid, Key: schemav1.ConfigKeyNotificationsUrl, Value: config.Url, Locked: _true},
{ClusterUuid: clusterUuid, Key: schemav1.ConfigKeyNotificationsUsername, Value: config.Username, Locked: _true},
{ClusterUuid: clusterUuid, Key: schemav1.ConfigKeyNotificationsPassword, Value: config.Password, Locked: _true},
}

err := db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error {
if kwebUrl := config.KubernetesWebUrl; kwebUrl != "" {
toDb = append(toDb, schemav1.Config{
Key: schemav1.ConfigKeyNotificationsKubernetesWebUrl,
Value: kwebUrl,
Locked: _true,
ClusterUuid: clusterUuid,
Key: schemav1.ConfigKeyNotificationsKubernetesWebUrl,
Value: kwebUrl,
Locked: _true,
})
} else {
if err := tx.SelectContext(ctx, &config.KubernetesWebUrl, fmt.Sprintf(
Expand All @@ -41,7 +42,7 @@ func SyncNotificationsConfig(ctx context.Context, db *database.DB, config *notif
if _, err := tx.ExecContext(
ctx,
fmt.Sprintf(
`DELETE FROM "%s" WHERE "key" LIKE ? AND "locked" = ?`,
`DELETE FROM "%s" WHERE "cluster_uuid" = ? AND "key" LIKE ? AND "locked" = ?`,
database.TableName(&schemav1.Config{}),
),
`notifications.%`,
Expand All @@ -65,7 +66,7 @@ func SyncNotificationsConfig(ctx context.Context, db *database.DB, config *notif
if _, err := tx.ExecContext(
ctx,
fmt.Sprintf(
`DELETE FROM "%s" WHERE "key" LIKE ? AND "locked" = ?`,
`DELETE FROM "%s" WHERE "cluster_uuid" = ? AND "key" LIKE ? AND "locked" = ?`,
database.TableName(&schemav1.Config{}),
),
`notifications.%`,
Expand Down
34 changes: 34 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package cluster

import (
"context"
"github.com/icinga/icinga-go-library/types"
)

// Private type to prevent collisions with other context keys
type contextKey string

// clusterUuidContextKey is the key for Cluster values in contexts.
var clusterUuidContextKey = contextKey("cluster_uuid")

// NewClusterUuidContext creates a new context that carries the provided cluster UUID.
// The new context is derived from the given parent context and associates the cluster UUID
// with a predefined key (clusterContextKey).
func NewClusterUuidContext(parent context.Context, clusterUuid types.UUID) context.Context {
return context.WithValue(parent, clusterUuidContextKey, clusterUuid)
}

// ClusterUuidFromContext returns the uuid value of the cluster stored in ctx, if any:
//
// clusterUuid, ok := ClusterUuidFromContext(ctx)
// if !ok {
// // Error handling.
// }
func ClusterUuidFromContext(ctx context.Context) types.UUID {
clusterUuid, ok := ctx.Value(clusterUuidContextKey).(types.UUID)
if !ok {
panic("cluster not found in context")
}

return clusterUuid
}
10 changes: 10 additions & 0 deletions pkg/schema/v1/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package v1

import (
"github.com/icinga/icinga-go-library/types"
)

type Cluster struct {
Uuid types.UUID
Name string
}
7 changes: 4 additions & 3 deletions pkg/schema/v1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import "github.com/icinga/icinga-go-library/types"

// Config represents a single key => value pair database config entry.
type Config struct {
Key ConfigKey
Value string
Locked types.Bool
ClusterUuid types.UUID
Key ConfigKey
Value string
Locked types.Bool
}

// ConfigKey represents the database config.Key enums.
Expand Down
1 change: 1 addition & 0 deletions pkg/schema/v1/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

type Instance struct {
Uuid types.Binary
ClusterUuid types.UUID
Version string
KubernetesVersion sql.NullString
KubernetesHeartbeat types.UnixMilli
Expand Down
Loading

0 comments on commit 07d1f90

Please sign in to comment.