Skip to content

Commit

Permalink
support the possibility that the Kargo controller isn't co-located wi…
Browse files Browse the repository at this point in the history
…th its own data or with Argo CD (#409)

Signed-off-by: Kent <[email protected]>
  • Loading branch information
krancour authored Jun 15, 2023
1 parent d17d38c commit 91a0f35
Show file tree
Hide file tree
Showing 12 changed files with 400 additions and 166 deletions.
216 changes: 177 additions & 39 deletions cmd/controlplane/controller.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
package main

import (
"strings"
"sync"

argocd "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/akuity/bookkeeper"
api "github.com/akuity/kargo/api/v1alpha1"
"github.com/akuity/kargo/internal/config"
libConfig "github.com/akuity/kargo/internal/config"
"github.com/akuity/kargo/internal/controller/applications"
"github.com/akuity/kargo/internal/controller/environments"
"github.com/akuity/kargo/internal/controller/promotions"
"github.com/akuity/kargo/internal/credentials"
Expand All @@ -34,73 +41,204 @@ func newControllerCommand() *cobra.Command {
"commit": version.GitCommit,
}).Info("Starting Kargo Controller")

config := config.NewControllerConfig()
mgrCfg, err := ctrl.GetConfig()
if err != nil {
return errors.Wrap(err, "get controller config")
}
cfg := libConfig.NewControllerConfig()

scheme := runtime.NewScheme()
if err = corev1.AddToScheme(scheme); err != nil {
return errors.Wrap(err, "add kubernetes core api to scheme")
}
if err = rbacv1.AddToScheme(scheme); err != nil {
return errors.Wrap(err, "add kubernetes rbac api to scheme")
}
if err = argocd.AddToScheme(scheme); err != nil {
return errors.Wrap(err, "add argocd api to scheme")
}
if err = api.AddToScheme(scheme); err != nil {
return errors.Wrap(err, "add kargo api to scheme")
}
mgr, err := ctrl.NewManager(
mgrCfg,
ctrl.Options{
Scheme: scheme,
Port: 9443,
},
)
kargoMgr, argoMgr, err := getMgrs()
if err != nil {
return errors.Wrap(err, "create manager")
return errors.Wrap(err, "error getting controller manager")
}

if err = environments.SetupWebhookWithManager(mgr); err != nil {
if err = environments.SetupWebhookWithManager(kargoMgr); err != nil {
return errors.Wrap(err, "error initializing Environment webhooks")
}
if err =
promotions.SetupWebhookWithManager(ctx, mgr, config); err != nil {
promotions.SetupWebhookWithManager(ctx, kargoMgr, cfg); err != nil {
return errors.Wrap(err, "error initializing Environment webhooks")
}

credentialsDB, err :=
credentials.NewKubernetesDatabase(ctx, config.ArgoCDNamespace, mgr)
credentialsDB, err := credentials.NewKubernetesDatabase(
ctx,
cfg.ArgoCDNamespace,
kargoMgr,
argoMgr,
)
if err != nil {
return errors.Wrap(err, "error initializing credentials DB")
}

if err := environments.SetupReconcilerWithManager(
ctx,
mgr,
kargoMgr,
argoMgr,
credentialsDB,
); err != nil {
return errors.Wrap(err, "setup environment reconciler")
return errors.Wrap(err, "error setting up Environments reconciler")
}
if err := promotions.SetupReconcilerWithManager(
mgr,
kargoMgr,
argoMgr,
credentialsDB,
bookkeeper.NewService(
&bookkeeper.ServiceOptions{
LogLevel: bookkeeper.LogLevel(config.LogLevel),
LogLevel: bookkeeper.LogLevel(cfg.LogLevel),
},
),
); err != nil {
return errors.Wrap(err, "setup promotion reconciler")
return errors.Wrap(err, "error setting up Promotions reconciler")
}
if err :=
applications.SetupReconcilerWithManager(kargoMgr, argoMgr); err != nil {
return errors.Wrap(err, "error setting up Applications reconciler")
}

if err := mgr.Start(ctx); err != nil {
return errors.Wrap(err, "start controller")
var errChan = make(chan error)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
if err := kargoMgr.Start(ctx); err != nil {
errChan <- errors.Wrap(err, "error starting kargo manager")
}
}()
if argoMgr != kargoMgr {
wg.Add(1)
go func() {
defer wg.Done()
if err := argoMgr.Start(ctx); err != nil {
errChan <- errors.Wrap(err, "error starting argo manager")
}
}()
}
return nil

// Adapt wg to a channel that can be used in a select
doneCh := make(chan struct{})
go func() {
wg.Wait()
close(doneCh)
}()

select {
case err := <-errChan:
return err
case <-doneCh:
return nil
}
},
}
}

func getMgrs() (manager.Manager, manager.Manager, error) {
kargoMgrCfg, argoMgrCfg, err := getMgrConfigs()
if err != nil {
// TODO: Wrap this
return nil, nil, err
}

kargoMgrScheme, argoMgrScheme, err := getMgrSchemes(kargoMgrCfg, argoMgrCfg)
if err != nil {
// TODO: Wrap this
return nil, nil, err
}

kargoMgr, err := ctrl.NewManager(
kargoMgrCfg,
ctrl.Options{
Scheme: kargoMgrScheme,
MetricsBindAddress: "0",
Port: 9443,
},
)
if err != nil {
// TODO: Wrap this
return nil, nil, err
}

var argoMgr manager.Manager
if argoMgrScheme == kargoMgrScheme {
argoMgr = kargoMgr
} else {
argoMgr, err = ctrl.NewManager(
argoMgrCfg,
ctrl.Options{
Scheme: argoMgrScheme,
MetricsBindAddress: "0",
},
)
if err != nil {
// TODO: Wrap this
return nil, nil, err
}
}

return kargoMgr, argoMgr, nil
}

func getMgrSchemes(kargoMgrCfg, argoMgrCfg *rest.Config) (*runtime.Scheme, *runtime.Scheme, error) {
kargoMgrScheme := runtime.NewScheme()
var argoMgrScheme *runtime.Scheme
if argoMgrCfg == kargoMgrCfg {
argoMgrScheme = kargoMgrScheme
} else {
argoMgrScheme = runtime.NewScheme()
}

// Schemes used by the Kargo controller manager
if err := corev1.AddToScheme(kargoMgrScheme); err != nil {
// TODO: Wrap this
return nil, nil, err
}
if err := rbacv1.AddToScheme(kargoMgrScheme); err != nil {
// TODO: Wrap this
return nil, nil, err
}
if err := api.AddToScheme(kargoMgrScheme); err != nil {
// TODO: Wrap this
return nil, nil, err
}

// Schemes used by the Argo CD controller manager
if err := corev1.AddToScheme(argoMgrScheme); err != nil {
// TODO: Wrap this
return nil, nil, err
}
if err := argocd.AddToScheme(argoMgrScheme); err != nil {
// TODO: Wrap this
return nil, nil, err
}

return kargoMgrScheme, argoMgrScheme, nil
}

func getMgrConfigs() (*rest.Config, *rest.Config, error) {
const kargoCtx = "kargo"
const argoCtx = "argo"

mgrCfg, err := config.GetConfig()
if err != nil {
// TODO: Wrap this
return nil, nil, err
}

kargoMgrCfg, err := config.GetConfigWithContext(kargoCtx)
if err != nil {
if strings.Contains(err.Error(), "does not exist") {
kargoMgrCfg = mgrCfg
} else {
// TODO: Wrap this
return nil, nil, err
}
}

argoMgrCfg, err := config.GetConfigWithContext(argoCtx)
if err != nil {
if strings.Contains(err.Error(), "does not exist") {
argoMgrCfg = mgrCfg
} else {
// TODO: Wrap this
return nil, nil, err
}
}

return kargoMgrCfg, argoMgrCfg, nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ require (
github.com/bufbuild/connect-go v1.7.0
github.com/bufbuild/connect-grpchealth-go v1.0.0
github.com/cosmos/gogoproto v1.4.10
github.com/satori/go.uuid v1.2.0
github.com/technosophos/moniker v0.0.0-20210218184952-3ea787d3943b
gopkg.in/yaml.v3 v3.0.1
oras.land/oras-go v1.2.2
Expand Down Expand Up @@ -164,7 +165,6 @@ require (
github.com/prometheus/procfs v0.8.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/russross/blackfriday v1.6.0 // indirect
github.com/satori/go.uuid v1.2.0 // indirect
github.com/sergi/go-diff v1.1.0 // indirect
github.com/shopspring/decimal v1.2.0 // indirect
github.com/spf13/cast v1.4.1 // indirect
Expand Down
103 changes: 103 additions & 0 deletions internal/controller/applications/applications.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package applications

import (
"context"
"fmt"

argocd "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/fields"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"

api "github.com/akuity/kargo/api/v1alpha1"
"github.com/akuity/kargo/internal/logging"
)

const (
envsByAppIndexField = "applications"
forceReconcileAnnotationKey = "kargo.akuity.io/force-reconcile"
)

// reconciler reconciles Argo CD Application resources.
type reconciler struct {
client client.Client
}

// SetupReconcilerWithManager initializes a reconciler for Argo CD Application
// resources and registers it with the provided Manager.
func SetupReconcilerWithManager(kargoMgr, argoMgr manager.Manager) error {
return ctrl.NewControllerManagedBy(argoMgr).
For(&argocd.Application{}).
Complete(newReconciler(kargoMgr.GetClient()))
}

func newReconciler(client client.Client) *reconciler {
return &reconciler{
client: client,
}
}

// Reconcile is part of the main Kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *reconciler) Reconcile(
ctx context.Context,
req ctrl.Request,
) (ctrl.Result, error) {
result := ctrl.Result{}

logger := logging.LoggerFromContext(ctx).WithFields(log.Fields{
"applicationNamespace": req.NamespacedName.Namespace,
"application": req.NamespacedName.Name,
})
logger.Debug("reconciling Argo CD Application")

// Find all Environments associated with this Application
envs := &api.EnvironmentList{}
if err := r.client.List(
ctx,
envs,
&client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(
envsByAppIndexField,
fmt.Sprintf(
"%s:%s",
req.NamespacedName.Namespace,
req.NamespacedName.Name,
),
),
},
); err != nil {
return result, errors.Wrapf(
err,
"error listing Environments for Application %q in namespace %q",
req.NamespacedName.Name,
req.NamespacedName.Namespace,
)
}

// Force associated Environments to reconcile by patching an annotation
for _, e := range envs.Items {
env := e // This is to sidestep implicit memory aliasing in this for loop
patch := client.MergeFrom(env.DeepCopy())
env.Annotations[forceReconcileAnnotationKey] = uuid.NewV4().String()
if err := r.client.Patch(ctx, &env, patch); err != nil {
logger.Error(err)
return result, errors.Wrapf(
err,
"error patching Environment %q in namespace %q",
env.Name,
env.Namespace,
)
}
logger.WithFields(log.Fields{
"environmentNamespace": env.Namespace,
"environment": env.Name,
}).Debug("successfully patched Environment to force reconciliation")
}

return result, nil
}
2 changes: 1 addition & 1 deletion internal/controller/environments/argocd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (r *reconciler) checkHealth(

for _, check := range argoCDAppUpdates {
app, err :=
r.getArgoCDAppFn(ctx, r.client, check.AppNamespace, check.AppName)
r.getArgoCDAppFn(ctx, r.argoClient, check.AppNamespace, check.AppName)
if err != nil {
if health.Status != api.HealthStateUnhealthy {
health.Status = api.HealthStateUnknown
Expand Down
Loading

0 comments on commit 91a0f35

Please sign in to comment.