Skip to content

Commit

Permalink
feat: event reporter use http for communication with argocd (#269)
Browse files Browse the repository at this point in the history
* feat: support multiple transports

* add logs

* fix linter
  • Loading branch information
pasha-codefresh authored Dec 13, 2023
1 parent d5dd4df commit 99e14b7
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Procfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
controller: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-application-controller $COMMAND --loglevel debug --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --repo-server localhost:${ARGOCD_E2E_REPOSERVER_PORT:-8081} --otlp-address=${ARGOCD_OTLP_ADDRESS} --application-namespaces=${ARGOCD_APPLICATION_NAMESPACES:-''}"
api-server: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-server $COMMAND --loglevel debug --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --disable-auth=${ARGOCD_E2E_DISABLE_AUTH:-'true'} --insecure --dex-server http://localhost:${ARGOCD_E2E_DEX_PORT:-5556} --repo-server localhost:${ARGOCD_E2E_REPOSERVER_PORT:-8081} --port ${ARGOCD_E2E_APISERVER_PORT:-8080} --otlp-address=${ARGOCD_OTLP_ADDRESS} --application-namespaces=${ARGOCD_APPLICATION_NAMESPACES:-''}"
dex: sh -c "ARGOCD_BINARY_NAME=argocd-dex go run github.com/argoproj/argo-cd/v2/cmd gendexcfg -o `pwd`/dist/dex.yaml && (test -f dist/dex.yaml || { echo 'Failed to generate dex configuration'; exit 1; }) && docker run --rm -p ${ARGOCD_E2E_DEX_PORT:-5556}:${ARGOCD_E2E_DEX_PORT:-5556} -v `pwd`/dist/dex.yaml:/dex.yaml ghcr.io/dexidp/dex:$(grep "image: ghcr.io/dexidp/dex" manifests/base/dex/argocd-dex-server-deployment.yaml | cut -d':' -f3) dex serve /dex.yaml"
redis: bash -c "if [ \"$ARGOCD_REDIS_LOCAL\" = 'true' ]; then redis-server --save '' --appendonly no --port ${ARGOCD_E2E_REDIS_PORT:-6379}; else docker run --rm --name argocd-redis -i -p ${ARGOCD_E2E_REDIS_PORT:-6379}:${ARGOCD_E2E_REDIS_PORT:-6379} docker.io/library/redis:$(grep "image: redis" manifests/base/redis/argocd-redis-deployment.yaml | cut -d':' -f3) --save '' --appendonly no --port ${ARGOCD_E2E_REDIS_PORT:-6379}; fi"
redis: bash -c "if [ \"$ARGOCD_REDIS_LOCAL\" = 'true' ]; then redis-server --save '' --appendonly no --port ${ARGOCD_E2E_REDIS_PORT:-6379}; else docker run --rm --name argocd-redis -i -p ${ARGOCD_E2E_REDIS_PORT:-6379}:${ARGOCD_E2E_REDIS_PORT:-6379} docker.io/library/redis:7.0.14-alpine --save '' --appendonly no --port ${ARGOCD_E2E_REDIS_PORT:-6379}; fi"
repo-server: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_GNUPGHOME=${ARGOCD_GNUPGHOME:-/tmp/argocd-local/gpg/keys} ARGOCD_PLUGINSOCKFILEPATH=${ARGOCD_PLUGINSOCKFILEPATH:-./test/cmp} ARGOCD_GPG_DATA_PATH=${ARGOCD_GPG_DATA_PATH:-/tmp/argocd-local/gpg/source} ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-repo-server ARGOCD_GPG_ENABLED=${ARGOCD_GPG_ENABLED:-false} $COMMAND --loglevel debug --port ${ARGOCD_E2E_REPOSERVER_PORT:-8081} --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --otlp-address=${ARGOCD_OTLP_ADDRESS}"
cmp-server: [ "$ARGOCD_E2E_TEST" = 'true' ] && exit 0 || [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_BINARY_NAME=argocd-cmp-server ARGOCD_PLUGINSOCKFILEPATH=${ARGOCD_PLUGINSOCKFILEPATH:-./test/cmp} $COMMAND --config-dir-path ./test/cmp --loglevel debug --otlp-address=${ARGOCD_OTLP_ADDRESS}"
ui: sh -c 'cd ui && ${ARGOCD_E2E_YARN_CMD:-yarn} start'
Expand Down
46 changes: 27 additions & 19 deletions cmd/event-reporter-server/commands/event_reporter_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/argoproj/argo-cd/v2/event_reporter"
appclient "github.com/argoproj/argo-cd/v2/event_reporter/application"
"github.com/argoproj/argo-cd/v2/event_reporter/codefresh"
"github.com/argoproj/argo-cd/v2/pkg/apiclient"

Expand Down Expand Up @@ -45,6 +46,27 @@ func init() {
failureRetryPeriodMilliSeconds = env.ParseNumFromEnv(failureRetryPeriodMilliSecondsEnv, failureRetryPeriodMilliSeconds, 0, 1000)
}

func getApplicationClient(useGrpc bool, address, token string) appclient.ApplicationClient {
if useGrpc {
applicationClientSet, err := apiclient.NewClient(&apiclient.ClientOptions{
ServerAddr: address,
Insecure: true,
GRPCWeb: true,
PlainText: true,
AuthToken: token,
})

errors.CheckError(err)

_, applicationClient, err := applicationClientSet.NewApplicationClient()

errors.CheckError(err)

return applicationClient
}
return appclient.NewHttpApplicationClient(token, address)
}

// NewCommand returns a new instance of an event reporter command
func NewCommand() *cobra.Command {
var (
Expand All @@ -68,6 +90,7 @@ func NewCommand() *cobra.Command {
codefreshUrl string
codefreshToken string
shardingAlgorithm string
useGrpc bool
)
var command = &cobra.Command{
Use: cliName,
Expand Down Expand Up @@ -130,24 +153,6 @@ func NewCommand() *cobra.Command {

repoclientset := repoapiclient.NewRepoServerClientset(repoServerAddress, repoServerTimeoutSeconds, tlsConfig)

applicationClientSet, err := apiclient.NewClient(&apiclient.ClientOptions{
ServerAddr: applicationServerAddress,
Insecure: true,
GRPCWeb: true,
PlainText: true,
AuthToken: argocdToken,
})

errors.CheckError(err)

closer, applicationClient, err := applicationClientSet.NewApplicationClient()

errors.CheckError(err)

defer func() {
_ = closer.Close()
}()

eventReporterServerOpts := event_reporter.EventReporterServerOpts{
ListenPort: listenPort,
ListenHost: listenHost,
Expand All @@ -160,13 +165,15 @@ func NewCommand() *cobra.Command {
Cache: cache,
RedisClient: redisClient,
ApplicationNamespaces: applicationNamespaces,
ApplicationServiceClient: applicationClient,
ApplicationServiceClient: getApplicationClient(useGrpc, applicationServerAddress, argocdToken),
CodefreshConfig: &codefresh.CodefreshConfig{
BaseURL: codefreshUrl,
AuthToken: codefreshToken,
},
}

log.Infof("Starting event reporter server with grpc transport %v", useGrpc)

stats.RegisterStackDumper()
stats.StartStatsTicker(10 * time.Minute)
stats.RegisterHeapDumper("memprofile")
Expand Down Expand Up @@ -206,6 +213,7 @@ func NewCommand() *cobra.Command {
command.Flags().StringVar(&codefreshUrl, "codefresh-url", env.StringFromEnv("CODEFRESH_URL", "https://g.codefresh.io"), "Codefresh API url")
command.Flags().StringVar(&codefreshToken, "codefresh-token", env.StringFromEnv("CODEFRESH_TOKEN", ""), "Codefresh token")
command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvEventReporterShardingAlgorithm, common.DefaultEventReporterShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy] ")
command.Flags().BoolVar(&useGrpc, "grpc", env.ParseBoolFromEnv("USE_GRPC", true), "Use grpc for interact with argocd server")
cacheSrc = servercache.AddCacheFlagsToCmd(command, func(client *redis.Client) {
redisClient = client
})
Expand Down
147 changes: 147 additions & 0 deletions event_reporter/application/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package application

import (
"context"
"encoding/json"
"fmt"
appclient "github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
repoapiclient "github.com/argoproj/argo-cd/v2/reposerver/apiclient"
"google.golang.org/grpc"
"io"
"net/http"
"strings"
"time"
)

type ApplicationClient interface {
Get(ctx context.Context, in *appclient.ApplicationQuery, opts ...grpc.CallOption) (*v1alpha1.Application, error)

RevisionMetadata(ctx context.Context, in *appclient.RevisionMetadataQuery, opts ...grpc.CallOption) (*v1alpha1.RevisionMetadata, error)

GetManifests(ctx context.Context, in *appclient.ApplicationManifestQuery, opts ...grpc.CallOption) (*repoapiclient.ManifestResponse, error)

ResourceTree(ctx context.Context, in *appclient.ResourcesQuery, opts ...grpc.CallOption) (*v1alpha1.ApplicationTree, error)

GetResource(ctx context.Context, in *appclient.ApplicationResourceRequest, opts ...grpc.CallOption) (*appclient.ApplicationResourceResponse, error)

List(ctx context.Context, in *appclient.ApplicationQuery, opts ...grpc.CallOption) (*v1alpha1.ApplicationList, error)
}

type httpApplicationClient struct {
httpClient *http.Client
baseUrl string
token string
}

func NewHttpApplicationClient(token string, address string) ApplicationClient {
if !strings.Contains(address, "http") {
address = "http://" + address
}

return &httpApplicationClient{
httpClient: &http.Client{
Timeout: 30 * time.Second,
},
baseUrl: address,
token: token,
}
}

func (c *httpApplicationClient) execute(ctx context.Context, url string, result interface{}, printBody ...bool) error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+c.token)

res, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()

b, _ := io.ReadAll(res.Body)

isStatusOK := res.StatusCode >= 200 && res.StatusCode < 300
if !isStatusOK {
return fmt.Errorf("argocd server respond with code %d, msg is: %s", res.StatusCode, string(b))
}

err = json.Unmarshal(b, &result)
if err != nil {
return err
}
return nil
}

func (c *httpApplicationClient) Get(ctx context.Context, in *appclient.ApplicationQuery, opts ...grpc.CallOption) (*v1alpha1.Application, error) {
url := fmt.Sprintf("%s/api/v1/applications/%s", c.baseUrl, *in.Name)
application := &v1alpha1.Application{}
err := c.execute(ctx, url, application)
if err != nil {
return nil, err
}
return application, nil
}

func (c *httpApplicationClient) RevisionMetadata(ctx context.Context, in *appclient.RevisionMetadataQuery, opts ...grpc.CallOption) (*v1alpha1.RevisionMetadata, error) {
url := fmt.Sprintf("%s/api/v1/applications/%s/revisions/%s/metadata", c.baseUrl, *in.Name, *in.Revision)
revisionMetadata := &v1alpha1.RevisionMetadata{}
err := c.execute(ctx, url, revisionMetadata)
if err != nil {
return nil, err
}
return revisionMetadata, nil
}

func (c *httpApplicationClient) GetManifests(ctx context.Context, in *appclient.ApplicationManifestQuery, opts ...grpc.CallOption) (*repoapiclient.ManifestResponse, error) {
url := fmt.Sprintf("%s/api/v1/applications/%s/manifests", c.baseUrl, *in.Name)

manifest := &repoapiclient.ManifestResponse{}
err := c.execute(ctx, url, manifest)
if err != nil {
return nil, err
}
return manifest, nil
}

func (c *httpApplicationClient) ResourceTree(ctx context.Context, in *appclient.ResourcesQuery, opts ...grpc.CallOption) (*v1alpha1.ApplicationTree, error) {
url := fmt.Sprintf("%s/api/v1/applications/%s/resource-tree", c.baseUrl, *in.ApplicationName)
tree := &v1alpha1.ApplicationTree{}
err := c.execute(ctx, url, tree)
if err != nil {
return nil, err
}
return tree, nil
}

func (c *httpApplicationClient) GetResource(ctx context.Context, in *appclient.ApplicationResourceRequest, opts ...grpc.CallOption) (*appclient.ApplicationResourceResponse, error) {
params := fmt.Sprintf("?namespace=%s&resourceName=%s&version=%s&group=%s&kind=%s",
*in.Namespace,
*in.ResourceName,
*in.Version,
*in.Group,
*in.Kind)
url := fmt.Sprintf("%s/api/v1/applications/%s/resource%s", c.baseUrl, *in.Name, params)

applicationResource := &appclient.ApplicationResourceResponse{}
err := c.execute(ctx, url, applicationResource, true)
if err != nil {
return nil, err
}
return applicationResource, nil
}

func (c *httpApplicationClient) List(ctx context.Context, in *appclient.ApplicationQuery, opts ...grpc.CallOption) (*v1alpha1.ApplicationList, error) {
url := fmt.Sprintf("%s/api/v1/applications", c.baseUrl)

apps := &v1alpha1.ApplicationList{}
err := c.execute(ctx, url, apps)
if err != nil {
return nil, err
}
return apps, nil
}
6 changes: 3 additions & 3 deletions event_reporter/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
appclient "github.com/argoproj/argo-cd/v2/event_reporter/application"
"math"
"strings"
"time"
Expand All @@ -10,7 +11,6 @@ import (
"github.com/argoproj/argo-cd/v2/event_reporter/codefresh"
"github.com/argoproj/argo-cd/v2/event_reporter/metrics"
"github.com/argoproj/argo-cd/v2/event_reporter/reporter"
applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
servercache "github.com/argoproj/argo-cd/v2/server/cache"
Expand All @@ -37,11 +37,11 @@ type eventReporterController struct {
applicationEventReporter reporter.ApplicationEventReporter
cache *servercache.Cache
appLister applisters.ApplicationLister
applicationServiceClient applicationpkg.ApplicationServiceClient
applicationServiceClient appclient.ApplicationClient
metricsServer *metrics.MetricsServer
}

func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient applicationpkg.ApplicationServiceClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager) EventReporterController {
func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager) EventReporterController {
appBroadcaster := reporter.NewBroadcaster(featureManager)
appInformer.AddEventHandler(appBroadcaster)
return &eventReporterController{
Expand Down
5 changes: 3 additions & 2 deletions event_reporter/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handlers

import (
"encoding/json"
appclient "github.com/argoproj/argo-cd/v2/event_reporter/application"
"github.com/argoproj/argo-cd/v2/event_reporter/sharding"
applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
"net/http"
Expand All @@ -10,10 +11,10 @@ import (
)

type RequestHandlers struct {
ApplicationServiceClient applicationpkg.ApplicationServiceClient
ApplicationServiceClient appclient.ApplicationClient
}

func GetRequestHandlers(applicationServiceClient applicationpkg.ApplicationServiceClient) *RequestHandlers {
func GetRequestHandlers(applicationServiceClient appclient.ApplicationClient) *RequestHandlers {
return &RequestHandlers{
ApplicationServiceClient: applicationServiceClient,
}
Expand Down
8 changes: 4 additions & 4 deletions event_reporter/reporter/application_event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/argoproj/argo-cd/v2/reposerver/apiclient"
"math"
"reflect"
"strings"
Expand All @@ -12,7 +13,6 @@ import (
argocommon "github.com/argoproj/argo-cd/v2/common"
"github.com/argoproj/argo-cd/v2/event_reporter/codefresh"
"github.com/argoproj/argo-cd/v2/event_reporter/metrics"
applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
servercache "github.com/argoproj/argo-cd/v2/server/cache"
"github.com/argoproj/argo-cd/v2/util/env"
Expand All @@ -28,11 +28,11 @@ import (
"k8s.io/apimachinery/pkg/watch"
"sigs.k8s.io/yaml"

appclient "github.com/argoproj/argo-cd/v2/event_reporter/application"
"github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
"github.com/argoproj/argo-cd/v2/pkg/apiclient/events"
appv1reg "github.com/argoproj/argo-cd/v2/pkg/apis/application"
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/reposerver/apiclient"
)

var (
Expand All @@ -43,7 +43,7 @@ type applicationEventReporter struct {
cache *servercache.Cache
codefreshClient codefresh.CodefreshClient
appLister applisters.ApplicationLister
applicationServiceClient applicationpkg.ApplicationServiceClient
applicationServiceClient appclient.ApplicationClient
metricsServer *metrics.MetricsServer
}

Expand All @@ -59,7 +59,7 @@ type ApplicationEventReporter interface {
ShouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool)
}

func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceClient applicationpkg.ApplicationServiceClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer) ApplicationEventReporter {
func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer) ApplicationEventReporter {
return &applicationEventReporter{
cache: cache,
applicationServiceClient: applicationServiceClient,
Expand Down
4 changes: 2 additions & 2 deletions event_reporter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"fmt"
appclient "github.com/argoproj/argo-cd/v2/event_reporter/application"
"github.com/argoproj/argo-cd/v2/event_reporter/reporter"
"net"
"net/http"
Expand All @@ -16,7 +17,6 @@ import (
event_reporter "github.com/argoproj/argo-cd/v2/event_reporter/controller"
"github.com/argoproj/argo-cd/v2/event_reporter/handlers"
"github.com/argoproj/argo-cd/v2/event_reporter/metrics"
applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"
appinformer "github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions"
applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
Expand Down Expand Up @@ -86,7 +86,7 @@ type EventReporterServerOpts struct {
KubeClientset kubernetes.Interface
AppClientset appclientset.Interface
RepoClientset repoapiclient.Clientset
ApplicationServiceClient applicationpkg.ApplicationServiceClient
ApplicationServiceClient appclient.ApplicationClient
Cache *servercache.Cache
RedisClient *redis.Client
ApplicationNamespaces []string
Expand Down

0 comments on commit 99e14b7

Please sign in to comment.