Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: event reporter use http for communication with argocd #269

Merged
merged 3 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Procfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
controller: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-application-controller $COMMAND --loglevel debug --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --repo-server localhost:${ARGOCD_E2E_REPOSERVER_PORT:-8081} --otlp-address=${ARGOCD_OTLP_ADDRESS} --application-namespaces=${ARGOCD_APPLICATION_NAMESPACES:-''}"
api-server: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-server $COMMAND --loglevel debug --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --disable-auth=${ARGOCD_E2E_DISABLE_AUTH:-'true'} --insecure --dex-server http://localhost:${ARGOCD_E2E_DEX_PORT:-5556} --repo-server localhost:${ARGOCD_E2E_REPOSERVER_PORT:-8081} --port ${ARGOCD_E2E_APISERVER_PORT:-8080} --otlp-address=${ARGOCD_OTLP_ADDRESS} --application-namespaces=${ARGOCD_APPLICATION_NAMESPACES:-''}"
dex: sh -c "ARGOCD_BINARY_NAME=argocd-dex go run github.com/argoproj/argo-cd/v2/cmd gendexcfg -o `pwd`/dist/dex.yaml && (test -f dist/dex.yaml || { echo 'Failed to generate dex configuration'; exit 1; }) && docker run --rm -p ${ARGOCD_E2E_DEX_PORT:-5556}:${ARGOCD_E2E_DEX_PORT:-5556} -v `pwd`/dist/dex.yaml:/dex.yaml ghcr.io/dexidp/dex:$(grep "image: ghcr.io/dexidp/dex" manifests/base/dex/argocd-dex-server-deployment.yaml | cut -d':' -f3) dex serve /dex.yaml"
redis: bash -c "if [ \"$ARGOCD_REDIS_LOCAL\" = 'true' ]; then redis-server --save '' --appendonly no --port ${ARGOCD_E2E_REDIS_PORT:-6379}; else docker run --rm --name argocd-redis -i -p ${ARGOCD_E2E_REDIS_PORT:-6379}:${ARGOCD_E2E_REDIS_PORT:-6379} docker.io/library/redis:$(grep "image: redis" manifests/base/redis/argocd-redis-deployment.yaml | cut -d':' -f3) --save '' --appendonly no --port ${ARGOCD_E2E_REDIS_PORT:-6379}; fi"
redis: bash -c "if [ \"$ARGOCD_REDIS_LOCAL\" = 'true' ]; then redis-server --save '' --appendonly no --port ${ARGOCD_E2E_REDIS_PORT:-6379}; else docker run --rm --name argocd-redis -i -p ${ARGOCD_E2E_REDIS_PORT:-6379}:${ARGOCD_E2E_REDIS_PORT:-6379} docker.io/library/redis:7.0.14-alpine --save '' --appendonly no --port ${ARGOCD_E2E_REDIS_PORT:-6379}; fi"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should remove this?

repo-server: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_GNUPGHOME=${ARGOCD_GNUPGHOME:-/tmp/argocd-local/gpg/keys} ARGOCD_PLUGINSOCKFILEPATH=${ARGOCD_PLUGINSOCKFILEPATH:-./test/cmp} ARGOCD_GPG_DATA_PATH=${ARGOCD_GPG_DATA_PATH:-/tmp/argocd-local/gpg/source} ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-repo-server ARGOCD_GPG_ENABLED=${ARGOCD_GPG_ENABLED:-false} $COMMAND --loglevel debug --port ${ARGOCD_E2E_REPOSERVER_PORT:-8081} --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --otlp-address=${ARGOCD_OTLP_ADDRESS}"
cmp-server: [ "$ARGOCD_E2E_TEST" = 'true' ] && exit 0 || [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_BINARY_NAME=argocd-cmp-server ARGOCD_PLUGINSOCKFILEPATH=${ARGOCD_PLUGINSOCKFILEPATH:-./test/cmp} $COMMAND --config-dir-path ./test/cmp --loglevel debug --otlp-address=${ARGOCD_OTLP_ADDRESS}"
ui: sh -c 'cd ui && ${ARGOCD_E2E_YARN_CMD:-yarn} start'
Expand Down
46 changes: 27 additions & 19 deletions cmd/event-reporter-server/commands/event_reporter_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/argoproj/argo-cd/v2/event_reporter"
appclient "github.com/argoproj/argo-cd/v2/event_reporter/application"
"github.com/argoproj/argo-cd/v2/event_reporter/codefresh"
"github.com/argoproj/argo-cd/v2/pkg/apiclient"

Expand Down Expand Up @@ -45,6 +46,27 @@ func init() {
failureRetryPeriodMilliSeconds = env.ParseNumFromEnv(failureRetryPeriodMilliSecondsEnv, failureRetryPeriodMilliSeconds, 0, 1000)
}

func getApplicationClient(useGrpc bool, address, token string) appclient.ApplicationClient {
if useGrpc {
applicationClientSet, err := apiclient.NewClient(&apiclient.ClientOptions{
ServerAddr: address,
Insecure: true,
GRPCWeb: true,
PlainText: true,
AuthToken: token,
})

errors.CheckError(err)

_, applicationClient, err := applicationClientSet.NewApplicationClient()

errors.CheckError(err)

return applicationClient
}
return appclient.NewHttpApplicationClient(token, address)
}

// NewCommand returns a new instance of an event reporter command
func NewCommand() *cobra.Command {
var (
Expand All @@ -68,6 +90,7 @@ func NewCommand() *cobra.Command {
codefreshUrl string
codefreshToken string
shardingAlgorithm string
useGrpc bool
)
var command = &cobra.Command{
Use: cliName,
Expand Down Expand Up @@ -130,24 +153,6 @@ func NewCommand() *cobra.Command {

repoclientset := repoapiclient.NewRepoServerClientset(repoServerAddress, repoServerTimeoutSeconds, tlsConfig)

applicationClientSet, err := apiclient.NewClient(&apiclient.ClientOptions{
ServerAddr: applicationServerAddress,
Insecure: true,
GRPCWeb: true,
PlainText: true,
AuthToken: argocdToken,
})

errors.CheckError(err)

closer, applicationClient, err := applicationClientSet.NewApplicationClient()

errors.CheckError(err)

defer func() {
_ = closer.Close()
}()

eventReporterServerOpts := event_reporter.EventReporterServerOpts{
ListenPort: listenPort,
ListenHost: listenHost,
Expand All @@ -160,13 +165,15 @@ func NewCommand() *cobra.Command {
Cache: cache,
RedisClient: redisClient,
ApplicationNamespaces: applicationNamespaces,
ApplicationServiceClient: applicationClient,
ApplicationServiceClient: getApplicationClient(useGrpc, applicationServerAddress, argocdToken),
CodefreshConfig: &codefresh.CodefreshConfig{
BaseURL: codefreshUrl,
AuthToken: codefreshToken,
},
}

log.Infof("Starting event reporter server with grpc transport %v", useGrpc)

stats.RegisterStackDumper()
stats.StartStatsTicker(10 * time.Minute)
stats.RegisterHeapDumper("memprofile")
Expand Down Expand Up @@ -206,6 +213,7 @@ func NewCommand() *cobra.Command {
command.Flags().StringVar(&codefreshUrl, "codefresh-url", env.StringFromEnv("CODEFRESH_URL", "https://g.codefresh.io"), "Codefresh API url")
command.Flags().StringVar(&codefreshToken, "codefresh-token", env.StringFromEnv("CODEFRESH_TOKEN", ""), "Codefresh token")
command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvEventReporterShardingAlgorithm, common.DefaultEventReporterShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy] ")
command.Flags().BoolVar(&useGrpc, "grpc", env.ParseBoolFromEnv("USE_GRPC", true), "Use grpc for interact with argocd server")
cacheSrc = servercache.AddCacheFlagsToCmd(command, func(client *redis.Client) {
redisClient = client
})
Expand Down
148 changes: 148 additions & 0 deletions event_reporter/application/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package application

import (
"context"
"encoding/json"
"errors"
"fmt"
appclient "github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
repoapiclient "github.com/argoproj/argo-cd/v2/reposerver/apiclient"
"google.golang.org/grpc"
"io"
"net/http"
"strings"
"time"
)

type ApplicationClient interface {
Get(ctx context.Context, in *appclient.ApplicationQuery, opts ...grpc.CallOption) (*v1alpha1.Application, error)

RevisionMetadata(ctx context.Context, in *appclient.RevisionMetadataQuery, opts ...grpc.CallOption) (*v1alpha1.RevisionMetadata, error)

GetManifests(ctx context.Context, in *appclient.ApplicationManifestQuery, opts ...grpc.CallOption) (*repoapiclient.ManifestResponse, error)

ResourceTree(ctx context.Context, in *appclient.ResourcesQuery, opts ...grpc.CallOption) (*v1alpha1.ApplicationTree, error)

GetResource(ctx context.Context, in *appclient.ApplicationResourceRequest, opts ...grpc.CallOption) (*appclient.ApplicationResourceResponse, error)

List(ctx context.Context, in *appclient.ApplicationQuery, opts ...grpc.CallOption) (*v1alpha1.ApplicationList, error)
}

type httpApplicationClient struct {
httpClient *http.Client
baseUrl string
token string
}

func NewHttpApplicationClient(token string, address string) ApplicationClient {
if !strings.Contains(address, "http") {
address = "http://" + address
}

return &httpApplicationClient{
httpClient: &http.Client{
Timeout: 30 * time.Second,
},
baseUrl: address,
token: token,
}
}

func (c *httpApplicationClient) execute(ctx context.Context, url string, result interface{}, printBody ...bool) error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+c.token)

res, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()

b, _ := io.ReadAll(res.Body)

isStatusOK := res.StatusCode >= 200 && res.StatusCode < 300
if !isStatusOK {
return errors.New(fmt.Sprintf("argocd server respond with code %d, msg is: %s", res.StatusCode, string(b)))

Check failure on line 71 in event_reporter/application/client.go

View workflow job for this annotation

GitHub Actions / Lint Go code

S1028: should use fmt.Errorf(...) instead of errors.New(fmt.Sprintf(...)) (gosimple)
}

err = json.Unmarshal(b, &result)
if err != nil {
return err
}
return nil
}

func (c *httpApplicationClient) Get(ctx context.Context, in *appclient.ApplicationQuery, opts ...grpc.CallOption) (*v1alpha1.Application, error) {
url := fmt.Sprintf("%s/api/v1/applications/%s", c.baseUrl, *in.Name)
application := &v1alpha1.Application{}
err := c.execute(ctx, url, application)
if err != nil {
return nil, err
}
return application, nil
}

func (c *httpApplicationClient) RevisionMetadata(ctx context.Context, in *appclient.RevisionMetadataQuery, opts ...grpc.CallOption) (*v1alpha1.RevisionMetadata, error) {
url := fmt.Sprintf("%s/api/v1/applications/%s/revisions/%s/metadata", c.baseUrl, *in.Name, *in.Revision)
revisionMetadata := &v1alpha1.RevisionMetadata{}
err := c.execute(ctx, url, revisionMetadata)
if err != nil {
return nil, err
}
return revisionMetadata, nil
}

func (c *httpApplicationClient) GetManifests(ctx context.Context, in *appclient.ApplicationManifestQuery, opts ...grpc.CallOption) (*repoapiclient.ManifestResponse, error) {
url := fmt.Sprintf("%s/api/v1/applications/%s/manifests", c.baseUrl, *in.Name)

manifest := &repoapiclient.ManifestResponse{}
err := c.execute(ctx, url, manifest)
if err != nil {
return nil, err
}
return manifest, nil
}

func (c *httpApplicationClient) ResourceTree(ctx context.Context, in *appclient.ResourcesQuery, opts ...grpc.CallOption) (*v1alpha1.ApplicationTree, error) {
url := fmt.Sprintf("%s/api/v1/applications/%s/resource-tree", c.baseUrl, *in.ApplicationName)
tree := &v1alpha1.ApplicationTree{}
err := c.execute(ctx, url, tree)
if err != nil {
return nil, err
}
return tree, nil
}

func (c *httpApplicationClient) GetResource(ctx context.Context, in *appclient.ApplicationResourceRequest, opts ...grpc.CallOption) (*appclient.ApplicationResourceResponse, error) {
params := fmt.Sprintf("?namespace=%s&resourceName=%s&version=%s&group=%s&kind=%s",
*in.Namespace,
*in.ResourceName,
*in.Version,
*in.Group,
*in.Kind)
url := fmt.Sprintf("%s/api/v1/applications/%s/resource%s", c.baseUrl, *in.Name, params)

applicationResource := &appclient.ApplicationResourceResponse{}
err := c.execute(ctx, url, applicationResource, true)
if err != nil {
return nil, err
}
return applicationResource, nil
}

func (c *httpApplicationClient) List(ctx context.Context, in *appclient.ApplicationQuery, opts ...grpc.CallOption) (*v1alpha1.ApplicationList, error) {
url := fmt.Sprintf("%s/api/v1/applications", c.baseUrl)

apps := &v1alpha1.ApplicationList{}
err := c.execute(ctx, url, apps)
if err != nil {
return nil, err
}
return apps, nil
}
6 changes: 3 additions & 3 deletions event_reporter/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
appclient "github.com/argoproj/argo-cd/v2/event_reporter/application"
"math"
"strings"
"time"
Expand All @@ -10,7 +11,6 @@ import (
"github.com/argoproj/argo-cd/v2/event_reporter/codefresh"
"github.com/argoproj/argo-cd/v2/event_reporter/metrics"
"github.com/argoproj/argo-cd/v2/event_reporter/reporter"
applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
servercache "github.com/argoproj/argo-cd/v2/server/cache"
Expand All @@ -37,11 +37,11 @@ type eventReporterController struct {
applicationEventReporter reporter.ApplicationEventReporter
cache *servercache.Cache
appLister applisters.ApplicationLister
applicationServiceClient applicationpkg.ApplicationServiceClient
applicationServiceClient appclient.ApplicationClient
metricsServer *metrics.MetricsServer
}

func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient applicationpkg.ApplicationServiceClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager) EventReporterController {
func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager) EventReporterController {
appBroadcaster := reporter.NewBroadcaster(featureManager)
appInformer.AddEventHandler(appBroadcaster)
return &eventReporterController{
Expand Down
5 changes: 3 additions & 2 deletions event_reporter/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handlers

import (
"encoding/json"
appclient "github.com/argoproj/argo-cd/v2/event_reporter/application"
"github.com/argoproj/argo-cd/v2/event_reporter/sharding"
applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
"net/http"
Expand All @@ -10,10 +11,10 @@ import (
)

type RequestHandlers struct {
ApplicationServiceClient applicationpkg.ApplicationServiceClient
ApplicationServiceClient appclient.ApplicationClient
}

func GetRequestHandlers(applicationServiceClient applicationpkg.ApplicationServiceClient) *RequestHandlers {
func GetRequestHandlers(applicationServiceClient appclient.ApplicationClient) *RequestHandlers {
return &RequestHandlers{
ApplicationServiceClient: applicationServiceClient,
}
Expand Down
8 changes: 4 additions & 4 deletions event_reporter/reporter/application_event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/argoproj/argo-cd/v2/reposerver/apiclient"
"math"
"reflect"
"strings"
Expand All @@ -12,7 +13,6 @@ import (
argocommon "github.com/argoproj/argo-cd/v2/common"
"github.com/argoproj/argo-cd/v2/event_reporter/codefresh"
"github.com/argoproj/argo-cd/v2/event_reporter/metrics"
applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
servercache "github.com/argoproj/argo-cd/v2/server/cache"
"github.com/argoproj/argo-cd/v2/util/env"
Expand All @@ -28,11 +28,11 @@ import (
"k8s.io/apimachinery/pkg/watch"
"sigs.k8s.io/yaml"

appclient "github.com/argoproj/argo-cd/v2/event_reporter/application"
"github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
"github.com/argoproj/argo-cd/v2/pkg/apiclient/events"
appv1reg "github.com/argoproj/argo-cd/v2/pkg/apis/application"
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/reposerver/apiclient"
)

var (
Expand All @@ -43,7 +43,7 @@ type applicationEventReporter struct {
cache *servercache.Cache
codefreshClient codefresh.CodefreshClient
appLister applisters.ApplicationLister
applicationServiceClient applicationpkg.ApplicationServiceClient
applicationServiceClient appclient.ApplicationClient
metricsServer *metrics.MetricsServer
}

Expand All @@ -59,7 +59,7 @@ type ApplicationEventReporter interface {
ShouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool)
}

func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceClient applicationpkg.ApplicationServiceClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer) ApplicationEventReporter {
func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer) ApplicationEventReporter {
return &applicationEventReporter{
cache: cache,
applicationServiceClient: applicationServiceClient,
Expand Down
4 changes: 2 additions & 2 deletions event_reporter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"fmt"
appclient "github.com/argoproj/argo-cd/v2/event_reporter/application"
"github.com/argoproj/argo-cd/v2/event_reporter/reporter"
"net"
"net/http"
Expand All @@ -16,7 +17,6 @@ import (
event_reporter "github.com/argoproj/argo-cd/v2/event_reporter/controller"
"github.com/argoproj/argo-cd/v2/event_reporter/handlers"
"github.com/argoproj/argo-cd/v2/event_reporter/metrics"
applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"
appinformer "github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions"
applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
Expand Down Expand Up @@ -86,7 +86,7 @@ type EventReporterServerOpts struct {
KubeClientset kubernetes.Interface
AppClientset appclientset.Interface
RepoClientset repoapiclient.Clientset
ApplicationServiceClient applicationpkg.ApplicationServiceClient
ApplicationServiceClient appclient.ApplicationClient
Cache *servercache.Cache
RedisClient *redis.Client
ApplicationNamespaces []string
Expand Down
Loading