Skip to content

Commit

Permalink
feat: acr controller in 2.12 (#336)
Browse files Browse the repository at this point in the history
* acr v1

* feat: acr v1.1 (#334)

* acr v1.1

* acr v1.1

* acr v1

* acr v1

* acr controller

* fix linter

* acr controller

* revert changes

* revert changes

* make lint issues

* make lint issues

* make lint issues

* make lint issues

* make lint issues

* make lint issues

* make lint issues

* make lint issues

* change changelog
  • Loading branch information
pasha-codefresh authored Sep 23, 2024
1 parent 2886cfb commit 0dc3234
Show file tree
Hide file tree
Showing 42 changed files with 4,207 additions and 1,069 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ USER root
RUN ln -s /usr/local/bin/argocd /usr/local/bin/argocd-server && \
ln -s /usr/local/bin/argocd /usr/local/bin/argocd-repo-server && \
ln -s /usr/local/bin/argocd /usr/local/bin/event-reporter-server && \
ln -s /usr/local/bin/argocd /usr/local/bin/argocd-application-change-revision-controller && \
ln -s /usr/local/bin/argocd /usr/local/bin/argocd-cmp-server && \
ln -s /usr/local/bin/argocd /usr/local/bin/argocd-application-controller && \
ln -s /usr/local/bin/argocd /usr/local/bin/argocd-dex && \
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ define run-in-test-client
-e GITHUB_TOKEN \
-e GOCACHE=/tmp/go-build-cache \
-e ARGOCD_LINT_GOGC=$(ARGOCD_LINT_GOGC) \
-e GOSUMDB=off \
-v ${DOCKER_SRC_MOUNT} \
-v ${GOPATH}/pkg/mod:/go/pkg/mod${VOLUME_MOUNT} \
-v ${GOCACHE}:/tmp/go-build-cache${VOLUME_MOUNT} \
Expand Down
98 changes: 98 additions & 0 deletions acr_controller/application/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package application_change_revision_controller

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"

"google.golang.org/grpc"

appclient "github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
)

//go:generate go run github.com/vektra/mockery/[email protected] --name=ApplicationClient

type ApplicationClient interface {
GetChangeRevision(ctx context.Context, in *appclient.ChangeRevisionRequest, opts ...grpc.CallOption) (*appclient.ChangeRevisionResponse, error)
}

type httpApplicationClient struct {
httpClient *http.Client
baseUrl string
token string
rootpath string
}

func NewHttpApplicationClient(token string, address string, rootpath string) ApplicationClient {
if rootpath != "" && !strings.HasPrefix(rootpath, "/") {
rootpath = "/" + rootpath
}

if !strings.Contains(address, "http") {
address = "http://" + address
}

if rootpath != "" {
address = address + rootpath
}

return &httpApplicationClient{
httpClient: &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
// Support for insecure connections
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
},
baseUrl: address,
token: token,
rootpath: rootpath,
}
}

func (c *httpApplicationClient) execute(ctx context.Context, url string, result interface{}, printBody ...bool) error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+c.token)

res, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()

b, _ := io.ReadAll(res.Body)

isStatusOK := res.StatusCode >= 200 && res.StatusCode < 300
if !isStatusOK {
return fmt.Errorf("argocd server respond with code %d, msg is: %s", res.StatusCode, string(b))
}

err = json.Unmarshal(b, &result)
if err != nil {
return err
}
return nil
}

func (c *httpApplicationClient) GetChangeRevision(ctx context.Context, in *appclient.ChangeRevisionRequest, opts ...grpc.CallOption) (*appclient.ChangeRevisionResponse, error) {
params := fmt.Sprintf("?appName=%s&namespace=%s&currentRevision=%s&previousRevision=%s", in.GetAppName(), in.GetNamespace(), in.GetCurrentRevision(), in.GetPreviousRevision())

url := fmt.Sprintf("%s/api/v1/application/changeRevision%s", c.baseUrl, params)

changeRevisionResponse := &appclient.ChangeRevisionResponse{}
err := c.execute(ctx, url, changeRevisionResponse)
if err != nil {
return nil, err
}
return changeRevisionResponse, nil
}
69 changes: 69 additions & 0 deletions acr_controller/application/mocks/ApplicationClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

107 changes: 107 additions & 0 deletions acr_controller/controller/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package application_change_revision_controller

import (
"sync"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/watch"

appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
)

type subscriber struct {
ch chan *appv1.ApplicationWatchEvent
filters []func(*appv1.ApplicationWatchEvent) bool
}

func (s *subscriber) matches(event *appv1.ApplicationWatchEvent) bool {
for i := range s.filters {
if !s.filters[i](event) {
return false
}
}
return true
}

// Broadcaster is an interface for broadcasting application informer watch events to multiple subscribers.
type Broadcaster interface {
Subscribe(ch chan *appv1.ApplicationWatchEvent, filters ...func(event *appv1.ApplicationWatchEvent) bool) func()
OnAdd(interface{}, bool)
OnUpdate(interface{}, interface{})
OnDelete(interface{})
}

type broadcasterHandler struct {
lock sync.Mutex
subscribers []*subscriber
}

func NewBroadcaster() Broadcaster {
return &broadcasterHandler{}
}

func (b *broadcasterHandler) notify(event *appv1.ApplicationWatchEvent) {
val, ok := event.Application.Annotations[appv1.AnnotationKeyManifestGeneratePaths]
if !ok || val == "" {
return
}

// Make a local copy of b.subscribers, then send channel events outside the lock,
// to avoid data race on b.subscribers changes
subscribers := []*subscriber{}
b.lock.Lock()
subscribers = append(subscribers, b.subscribers...)
b.lock.Unlock()

for _, s := range subscribers {
if s.matches(event) {
select {
case s.ch <- event:
{
// log.Infof("adding application '%s' to channel", event.Application.Name)
}
default:
// drop event if cannot send right away
log.WithField("application", event.Application.Name).Warn("unable to send event notification")
}
}
}
}

// Subscribe forward application informer watch events to the provided channel.
// The watch events are dropped if no receives are reading events from the channel so the channel must have
// buffer if dropping events is not acceptable.
func (b *broadcasterHandler) Subscribe(ch chan *appv1.ApplicationWatchEvent, filters ...func(event *appv1.ApplicationWatchEvent) bool) func() {
b.lock.Lock()
defer b.lock.Unlock()
subscriber := &subscriber{ch, filters}
b.subscribers = append(b.subscribers, subscriber)
return func() {
b.lock.Lock()
defer b.lock.Unlock()
for i := range b.subscribers {
if b.subscribers[i] == subscriber {
b.subscribers = append(b.subscribers[:i], b.subscribers[i+1:]...)
break
}
}
}
}

func (b *broadcasterHandler) OnAdd(obj interface{}, isInInitialList bool) {
if app, ok := obj.(*appv1.Application); ok {
b.notify(&appv1.ApplicationWatchEvent{Application: *app, Type: watch.Added})
}
}

func (b *broadcasterHandler) OnUpdate(_, newObj interface{}) {
if app, ok := newObj.(*appv1.Application); ok {
b.notify(&appv1.ApplicationWatchEvent{Application: *app, Type: watch.Modified})
}
}

func (b *broadcasterHandler) OnDelete(obj interface{}) {
if app, ok := obj.(*appv1.Application); ok {
b.notify(&appv1.ApplicationWatchEvent{Application: *app, Type: watch.Deleted})
}
}
85 changes: 85 additions & 0 deletions acr_controller/controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package application_change_revision_controller

import (
"context"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"

appclient "github.com/argoproj/argo-cd/v2/acr_controller/application"
"github.com/argoproj/argo-cd/v2/acr_controller/service"
appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"

appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
servercache "github.com/argoproj/argo-cd/v2/server/cache"
"github.com/argoproj/argo-cd/v2/util/settings"
)

var watchAPIBufferSize = 1000

type ACRController interface {
Run(ctx context.Context)
}

type applicationChangeRevisionController struct {
settingsMgr *settings.SettingsManager
appBroadcaster Broadcaster
cache *servercache.Cache
appLister applisters.ApplicationLister
applicationServiceClient appclient.ApplicationClient
acrService service.ACRService
applicationClientset appclientset.Interface
}

func NewApplicationChangeRevisionController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, applicationClientset appclientset.Interface) ACRController {
appBroadcaster := NewBroadcaster()
_, err := appInformer.AddEventHandler(appBroadcaster)
if err != nil {
log.Error(err)
}
return &applicationChangeRevisionController{
appBroadcaster: appBroadcaster,
cache: cache,
settingsMgr: settingsMgr,
applicationServiceClient: applicationServiceClient,
appLister: appLister,
applicationClientset: applicationClientset,
acrService: service.NewACRService(applicationClientset, applicationServiceClient),
}
}

func (c *applicationChangeRevisionController) Run(ctx context.Context) {
var logCtx log.FieldLogger = log.StandardLogger()

calculateIfPermitted := func(ctx context.Context, a appv1.Application, eventType watch.EventType, ts string) error {
if eventType == watch.Bookmark || eventType == watch.Deleted {
return nil // ignore this event
}

return c.acrService.ChangeRevision(ctx, &a)
}

// TODO: move to abstraction
eventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize)
unsubscribe := c.appBroadcaster.Subscribe(eventsChannel)
defer unsubscribe()
for {
select {
case <-ctx.Done():
return
case event := <-eventsChannel:
// logCtx.Infof("channel size is %d", len(eventsChannel))

ts := time.Now().Format("2006-01-02T15:04:05.000Z")
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
err := calculateIfPermitted(ctx, event.Application, event.Type, ts)
if err != nil {
logCtx.WithError(err).Error("failed to calculate change revision")
}
cancel()
}
}
}
Loading

0 comments on commit 0dc3234

Please sign in to comment.