diff --git a/assets/swagger.json b/assets/swagger.json index 1e467c4aa18bc..3d1d39da23a2c 100644 --- a/assets/swagger.json +++ b/assets/swagger.json @@ -4056,52 +4056,6 @@ } } }, - "/api/v1/stream/events": { - "get": { - "tags": [ - "Eventing" - ], - "operationId": "Eventing_StartEventSource", - "parameters": [ - { - "type": "string", - "description": "The event source name.", - "name": "name", - "in": "query" - }, - { - "type": "string", - "format": "byte", - "description": "The event source configuration value.", - "name": "config", - "in": "query" - } - ], - "responses": { - "200": { - "description": "A successful response.(streaming responses)", - "schema": { - "type": "object", - "title": "Stream result of genericEvent", - "properties": { - "error": { - "$ref": "#/definitions/runtimeStreamError" - }, - "result": { - "$ref": "#/definitions/genericEvent" - } - } - } - }, - "default": { - "description": "An unexpected error response.", - "schema": { - "$ref": "#/definitions/runtimeError" - } - } - } - } - }, "/api/version": { "get": { "tags": [ @@ -4790,21 +4744,6 @@ } } }, - "genericEvent": { - "type": "object", - "title": "*\nRepresents an event", - "properties": { - "name": { - "description": "The event source name.", - "type": "string" - }, - "payload": { - "description": "The event payload.", - "type": "string", - "format": "byte" - } - } - }, "gpgkeyGnuPGPublicKeyCreateResponse": { "type": "object", "title": "Response to a public key creation request", diff --git a/changelog/CHANGELOG.md b/changelog/CHANGELOG.md index 44f728a89a47b..ec07d1005bc37 100644 --- a/changelog/CHANGELOG.md +++ b/changelog/CHANGELOG.md @@ -1,2 +1,2 @@ ### Chore -- chore: selfheal should work with monorepo additional logs \ No newline at end of file +- chore: removed v1 reporter \ No newline at end of file diff --git a/controller/appcontroller_test.go b/controller/appcontroller_test.go index cc40516123e0a..eba264d27cf63 100644 --- a/controller/appcontroller_test.go +++ b/controller/appcontroller_test.go @@ -1984,7 +1984,7 @@ func TestAlreadyAttemptSync(t *testing.T) { t.Run("different manifest with sync result, with enabled flag", func(t *testing.T) { _ = os.Setenv("PERSIST_CHANGE_REVISIONS", "1") - + attempted, _ := alreadyAttemptedSync(app, "sha", []string{}, false, nil) assert.False(t, attempted) }) diff --git a/docs/operator-manual/notifications/services/opsgenie.md b/docs/operator-manual/notifications/services/opsgenie.md index c590a4ac979b6..2cc1ebff62abf 100755 --- a/docs/operator-manual/notifications/services/opsgenie.md +++ b/docs/operator-manual/notifications/services/opsgenie.md @@ -7,22 +7,58 @@ To be able to send notifications with argocd-notifications you have to create an 3. Click "Teams" in the Menu on the left 4. Select the team that you want to notify 5. In the teams configuration menu select "Integrations" -6. click "Add Integration" in the top right corner +6. Click "Add Integration" in the top right corner 7. Select "API" integration 8. Give your integration a name, copy the "API key" and safe it somewhere for later -9. Make sure the checkboxes for "Create and Update Access" and "enable" are selected, disable the other checkboxes to remove unnecessary permissions -10. Click "Safe Integration" at the bottom -11. Check your browser for the correct server apiURL. If it is "app.opsgenie.com" then use the US/international api url `api.opsgenie.com` in the next step, otherwise use `api.eu.opsgenie.com` (European API). -12. You are finished with configuring opsgenie. Now you need to configure argocd-notifications. Use the apiUrl, the team name and the apiKey to configure the Opsgenie integration in the `argocd-notifications-secret` secret. +9. Click "Edit" in the integration settings +10. Make sure the checkbox for "Create and Update Access" is selected, disable the other checkboxes to remove unnecessary permissions +11. Click "Save" at the bottom +12. Click "Turn on integration" in the top right corner +13. Check your browser for the correct server apiURL. If it is "app.opsgenie.com" then use the US/international api url `api.opsgenie.com` in the next step, otherwise use `api.eu.opsgenie.com` (European API). +14. You are finished with configuring Opsgenie. Now you need to configure argocd-notifications. Use the apiUrl, the team name and the apiKey to configure the Opsgenie integration in the `argocd-notifications-secret` secret. +15. You can find the example `argocd-notifications-cm` configuration at the below. + +| **Option** | **Required** | **Type** | **Description** | **Example** | +| ------------- | ------------ | -------- | -------------------------------------------------------------------------------------------------------- | -------------------------------- | +| `description` | True | `string` | Description field of the alert that is generally used to provide a detailed information about the alert. | `Hello from Argo CD!` | +| `priority` | False | `string` | Priority level of the alert. Possible values are P1, P2, P3, P4 and P5. Default value is P3. | `P1` | +| `alias` | False | `string` | Client-defined identifier of the alert, that is also the key element of Alert De-Duplication. | `Life is too short for no alias` | +| `note` | False | `string` | Additional note that will be added while creating the alert. | `Error from Argo CD!` | ```yaml apiVersion: v1 kind: ConfigMap metadata: - name: + name: argocd-notifications-cm data: service.opsgenie: | apiUrl: apiKeys: : + template.opsgenie: | + message: | + [Argo CD] Application {{.app.metadata.name}} has a problem. + opsgenie: + description: | + Application: {{.app.metadata.name}} + Health Status: {{.app.status.health.status}} + Operation State Phase: {{.app.status.operationState.phase}} + Sync Status: {{.app.status.sync.status}} + priority: P1 + alias: {{.app.metadata.name}} + note: Error from Argo CD! + trigger.on-a-problem: | + - description: Application has a problem. + send: + - opsgenie + when: app.status.health.status == 'Degraded' or app.status.operationState.phase in ['Error', 'Failed'] or app.status.sync.status == 'Unknown' +``` + +16. Add annotation in application yaml file to enable notifications for specific Argo CD app. +```yaml + apiVersion: argoproj.io/v1alpha1 + kind: Application + metadata: + annotations: + notifications.argoproj.io/subscribe.on-a-problem.opsgenie: ``` \ No newline at end of file diff --git a/server/application/applications_errors_parser_test.go b/event_reporter/reporter/applications_errors_parser_test.go similarity index 99% rename from server/application/applications_errors_parser_test.go rename to event_reporter/reporter/applications_errors_parser_test.go index a028d803f6438..7a4adc7e75a1c 100644 --- a/server/application/applications_errors_parser_test.go +++ b/event_reporter/reporter/applications_errors_parser_test.go @@ -1,4 +1,4 @@ -package application +package reporter import ( "fmt" diff --git a/event_reporter/reporter/broadcaster.go b/event_reporter/reporter/broadcaster.go index 629e3bf9d7279..e4d3b6629e104 100644 --- a/event_reporter/reporter/broadcaster.go +++ b/event_reporter/reporter/broadcaster.go @@ -63,11 +63,6 @@ func (b *broadcasterHandler) notify(event *appv1.ApplicationWatchEvent) { subscribers = append(subscribers, b.subscribers...) b.lock.Unlock() - if !b.featureManager.ShouldReporterRun() { - log.Infof("filtering application '%s', event reporting is turned off and old one is in use", event.Application.Name) - return - } - if b.filter != nil { result, expectedShard := b.filter(&event.Application) if !result { diff --git a/event_reporter/reporter/feature_manager.go b/event_reporter/reporter/feature_manager.go index c483ec9191b8e..bbfd34121736e 100644 --- a/event_reporter/reporter/feature_manager.go +++ b/event_reporter/reporter/feature_manager.go @@ -2,39 +2,12 @@ package reporter import ( settings_util "github.com/argoproj/argo-cd/v2/util/settings" - log "github.com/sirupsen/logrus" - "time" ) type FeatureManager struct { settingsMgr *settings_util.SettingsManager - shouldRun bool } func NewFeatureManager(settingsMgr *settings_util.SettingsManager) *FeatureManager { return &FeatureManager{settingsMgr: settingsMgr} } - -func (f *FeatureManager) setShouldRun() { - reporterVersion, err := f.settingsMgr.GetCodefreshReporterVersion() - if err != nil { - log.Warnf("Failed to get reporter version: %v", err) - f.shouldRun = false - return - } - f.shouldRun = reporterVersion == string(settings_util.CodefreshV2ReporterVersion) -} - -func (f *FeatureManager) Watch() { - f.setShouldRun() - // nolint:staticcheck - tick := time.Tick(5 * time.Second) - for { - <-tick - f.setShouldRun() - } -} - -func (f *FeatureManager) ShouldReporterRun() bool { - return f.shouldRun -} diff --git a/event_reporter/server.go b/event_reporter/server.go index a90eb2d2600bc..4154ac39c1dc2 100644 --- a/event_reporter/server.go +++ b/event_reporter/server.go @@ -146,7 +146,6 @@ func (a *EventReporterServer) healthCheck(r *http.Request) error { // Init starts informers used by the API server func (a *EventReporterServer) Init(ctx context.Context) { go a.appInformer.Run(ctx.Done()) - go a.featureManager.Watch() svcSet := newEventReporterServiceSet(a) a.serviceSet = svcSet } diff --git a/go.mod b/go.mod index 91e9dfece2122..99d8c10f80812 100644 --- a/go.mod +++ b/go.mod @@ -186,7 +186,7 @@ require ( github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect github.com/fatih/camelcase v1.0.0 // indirect github.com/fvbommel/sortorder v1.0.1 // indirect - github.com/ghodss/yaml v1.0.0 + github.com/ghodss/yaml v1.0.0 // indirect github.com/go-errors/errors v1.4.2 // indirect github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect github.com/go-git/go-billy/v5 v5.5.0 // indirect diff --git a/pkg/apiclient/events/events.pb.go b/pkg/apiclient/events/events.pb.go index 5f19438b95c2f..d00ffe18cc196 100644 --- a/pkg/apiclient/events/events.pb.go +++ b/pkg/apiclient/events/events.pb.go @@ -9,16 +9,12 @@ package events import ( - context "context" fmt "fmt" _ "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" _ "github.com/gogo/protobuf/gogoproto" github_com_gogo_protobuf_proto "github.com/gogo/protobuf/proto" proto "github.com/gogo/protobuf/proto" _ "google.golang.org/genproto/googleapis/api/annotations" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" io "io" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" math "math" @@ -674,178 +670,67 @@ func init() { func init() { proto.RegisterFile("server/application/events.proto", fileDescriptor_3ad9267ec62b112f) } var fileDescriptor_3ad9267ec62b112f = []byte{ - // 1009 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xdd, 0x6e, 0x1c, 0xb5, - 0x17, 0xff, 0xcf, 0xe6, 0xfb, 0xec, 0x26, 0xed, 0xdf, 0x49, 0x5a, 0x2b, 0x0a, 0xe9, 0x6a, 0x15, - 0xa1, 0x55, 0xd5, 0xce, 0x92, 0xf0, 0xa1, 0xb6, 0x42, 0x48, 0xa9, 0x12, 0x44, 0x20, 0x01, 0x34, - 0xa1, 0x95, 0xe8, 0x9d, 0x33, 0x73, 0x3a, 0xeb, 0xee, 0x8c, 0x6d, 0x6c, 0xef, 0x4a, 0x7b, 0x8b, - 0x78, 0x03, 0x9e, 0x84, 0xb7, 0xe8, 0x25, 0x12, 0xf7, 0x08, 0x45, 0x48, 0xbc, 0x06, 0xb2, 0x67, - 0x76, 0xe3, 0x09, 0xb9, 0xa0, 0xdc, 0xf9, 0xfc, 0xce, 0xef, 0x1c, 0xfb, 0xcc, 0xf9, 0x1a, 0x78, - 0x60, 0x50, 0x4f, 0x50, 0x0f, 0x98, 0x52, 0x05, 0x4f, 0x99, 0xe5, 0x52, 0x0c, 0x70, 0x82, 0xc2, - 0x9a, 0x58, 0x69, 0x69, 0x25, 0x59, 0xc9, 0x51, 0xa0, 0xe6, 0xe9, 0xce, 0x59, 0xce, 0xed, 0x70, - 0x7c, 0x19, 0xa7, 0xb2, 0x1c, 0x30, 0x9d, 0x4b, 0xa5, 0xe5, 0x1b, 0x7f, 0x78, 0x9c, 0x66, 0x83, - 0xc9, 0xe1, 0x40, 0x8d, 0xf2, 0x01, 0x53, 0xdc, 0x34, 0x5c, 0x4d, 0x0e, 0x58, 0xa1, 0x86, 0xec, - 0x60, 0xe0, 0xbd, 0x30, 0x8b, 0x59, 0xe5, 0x76, 0xe7, 0xa3, 0xd1, 0x13, 0x13, 0x73, 0xe9, 0x2c, - 0x4a, 0x96, 0x0e, 0xb9, 0x40, 0x3d, 0xbd, 0x76, 0x51, 0xa2, 0x65, 0x83, 0xc9, 0x3f, 0xad, 0xb6, - 0x72, 0xe9, 0x2f, 0xb6, 0x72, 0xe0, 0x4e, 0x35, 0xba, 0x9b, 0x4b, 0x99, 0x17, 0xe8, 0x4c, 0x07, - 0x4c, 0x08, 0x69, 0xfd, 0xdd, 0x75, 0x00, 0xbd, 0xa7, 0xd0, 0x3e, 0x71, 0x01, 0x5d, 0xc8, 0xb1, - 0x4e, 0x91, 0x10, 0x58, 0x14, 0xac, 0x44, 0x1a, 0x75, 0x5b, 0xfd, 0xb5, 0xc4, 0x9f, 0xc9, 0x3d, - 0x58, 0x4e, 0xa5, 0x78, 0xcd, 0x73, 0xda, 0xea, 0x46, 0xfd, 0x4e, 0x52, 0x4b, 0xbd, 0x8f, 0x61, - 0xc9, 0x9b, 0xde, 0x6a, 0x44, 0x61, 0x45, 0xb1, 0x69, 0x21, 0x59, 0x46, 0x5b, 0xdd, 0x56, 0xbf, - 0x93, 0xcc, 0xc4, 0xde, 0x5f, 0x11, 0x74, 0xbc, 0xdd, 0xb7, 0x15, 0x40, 0x7a, 0xb0, 0x66, 0x79, - 0x89, 0xc6, 0xb2, 0x52, 0x55, 0x3e, 0x9e, 0x2f, 0xbe, 0xfd, 0xfd, 0xc1, 0xff, 0x92, 0x6b, 0xd8, - 0xbd, 0x41, 0x5e, 0xbe, 0xc1, 0xd4, 0xd6, 0xde, 0x6a, 0x89, 0x3c, 0x86, 0x65, 0xe3, 0x5f, 0x4e, - 0x17, 0xba, 0xad, 0x7e, 0xfb, 0x70, 0x3b, 0xae, 0x13, 0x12, 0x7f, 0xe3, 0x09, 0x55, 0x58, 0x49, - 0x4d, 0x22, 0x8f, 0x60, 0x19, 0xb5, 0x96, 0xda, 0xd0, 0xc5, 0xee, 0x42, 0xbf, 0x7d, 0xb8, 0x75, - 0x83, 0x7e, 0xe2, 0x94, 0x49, 0xcd, 0x21, 0x9f, 0x41, 0x9b, 0x29, 0xf5, 0x12, 0xb5, 0x71, 0x1f, - 0x8c, 0x2e, 0x75, 0xa3, 0x7e, 0xfb, 0x70, 0x77, 0x6e, 0x72, 0x74, 0x9d, 0xc9, 0x19, 0x27, 0x09, - 0x0d, 0x7a, 0x3f, 0xad, 0x41, 0x27, 0x7c, 0x06, 0x89, 0xe1, 0x4e, 0x86, 0x86, 0x6b, 0xcc, 0xce, - 0x99, 0xe0, 0xaf, 0xd1, 0x58, 0x1a, 0x75, 0xa3, 0x79, 0xbc, 0x37, 0x95, 0xe4, 0x11, 0x6c, 0xb0, - 0xd4, 0x8e, 0x59, 0x31, 0xa7, 0xb7, 0x02, 0xfa, 0x0d, 0x1d, 0x79, 0x1f, 0xda, 0x39, 0xb7, 0x73, - 0xea, 0x42, 0x40, 0x0d, 0x15, 0x64, 0x0f, 0x56, 0x34, 0x2a, 0xf9, 0x22, 0x39, 0xa3, 0x8b, 0x01, - 0x67, 0x06, 0x12, 0x0a, 0x8b, 0x8a, 0xd9, 0xa1, 0x8f, 0x77, 0xa6, 0xf4, 0x08, 0xe9, 0xc2, 0xaa, - 0xc6, 0x09, 0x77, 0xd1, 0xd1, 0xe5, 0x40, 0x3b, 0x47, 0xc9, 0x43, 0x58, 0x4f, 0x65, 0x59, 0x72, - 0x7b, 0x8e, 0xc6, 0xb0, 0x1c, 0xe9, 0x4a, 0x40, 0x6b, 0xaa, 0x48, 0x1f, 0x3a, 0x15, 0x70, 0x34, - 0xb6, 0x43, 0xa9, 0xe9, 0x6a, 0x40, 0x6d, 0x68, 0xc8, 0x97, 0x00, 0x95, 0x7c, 0xcc, 0x2c, 0xd2, - 0x35, 0x9f, 0x87, 0x87, 0x71, 0xd5, 0x23, 0x71, 0xd8, 0x23, 0xb1, 0x1a, 0xe5, 0x0e, 0x30, 0xb1, - 0xeb, 0x91, 0x78, 0x72, 0x10, 0x7f, 0xc7, 0x4b, 0x4c, 0x02, 0x6b, 0x17, 0x3d, 0x53, 0xea, 0x6b, - 0x57, 0xaf, 0x10, 0x46, 0x5f, 0x83, 0xe4, 0x0b, 0x58, 0x63, 0x4a, 0x9d, 0xb1, 0x4b, 0x2c, 0x0c, - 0x6d, 0xfb, 0x2a, 0xd9, 0xbf, 0xb5, 0xa8, 0x5c, 0xfe, 0x2b, 0xda, 0x89, 0xb0, 0x7a, 0x3a, 0xab, - 0xd9, 0xb9, 0x31, 0xd9, 0x07, 0x30, 0x53, 0x91, 0x5e, 0x58, 0x66, 0xc7, 0x86, 0x76, 0x82, 0xcb, - 0x02, 0x9c, 0xbc, 0x84, 0xf5, 0x5a, 0xd2, 0x16, 0xb3, 0x23, 0x4b, 0xd7, 0xdf, 0x35, 0xbc, 0xd9, - 0xd7, 0x6d, 0xb8, 0x21, 0x09, 0x6c, 0x38, 0xe0, 0x73, 0x2e, 0xb8, 0x19, 0x7a, 0xc7, 0x1b, 0xef, - 0xfc, 0xdd, 0x6e, 0x78, 0x20, 0x3d, 0xe8, 0x0c, 0x91, 0x15, 0x76, 0x58, 0xc7, 0x74, 0xc7, 0xc5, - 0x94, 0x34, 0x30, 0xb2, 0x0f, 0xeb, 0x95, 0x3c, 0xab, 0x80, 0xbb, 0x9e, 0xd4, 0x04, 0x5d, 0x16, - 0xd2, 0x62, 0x6c, 0x2c, 0x6a, 0xfa, 0xff, 0x30, 0x0b, 0x35, 0xe8, 0x66, 0xc2, 0x90, 0x1b, 0x2b, - 0xf5, 0xf4, 0x34, 0xa3, 0xa4, 0x1b, 0xf5, 0x17, 0x66, 0xdf, 0x77, 0x0e, 0x93, 0x67, 0xb0, 0x2d, - 0x95, 0x1b, 0x80, 0x5c, 0x8a, 0x8b, 0xa9, 0x48, 0x93, 0x59, 0x69, 0x6e, 0x06, 0x1e, 0x6f, 0xa7, - 0x90, 0x5d, 0x58, 0x66, 0x4a, 0xbd, 0x38, 0x3d, 0xa6, 0x5b, 0x01, 0xb9, 0xc6, 0x5c, 0x65, 0xd6, - 0xe5, 0x60, 0x14, 0x4b, 0x91, 0x6e, 0x87, 0x95, 0x19, 0x6a, 0xc8, 0x27, 0xb0, 0xc9, 0x94, 0x3a, - 0x15, 0xc6, 0x32, 0x91, 0xa2, 0x4f, 0xfc, 0x57, 0x38, 0xa5, 0xf7, 0x02, 0x83, 0xdb, 0x08, 0xae, - 0xb3, 0xad, 0x66, 0xe9, 0x88, 0x8b, 0xfc, 0x1c, 0xed, 0x50, 0x66, 0xf4, 0x7e, 0xd8, 0xd9, 0x4d, - 0xdd, 0xce, 0xa7, 0xb0, 0xd1, 0x2c, 0x36, 0x72, 0x17, 0x16, 0x46, 0x38, 0xad, 0xa6, 0x47, 0xe2, - 0x8e, 0x64, 0x0b, 0x96, 0x26, 0xac, 0x18, 0x63, 0x35, 0x22, 0x92, 0x4a, 0x78, 0xd6, 0x7a, 0x12, - 0xf5, 0x7e, 0x89, 0xa0, 0x1d, 0x8c, 0x37, 0xd7, 0xdf, 0x76, 0xaa, 0xb0, 0x31, 0x7a, 0x3c, 0x42, - 0x76, 0x60, 0xa9, 0xc0, 0x09, 0x16, 0x8d, 0x31, 0x53, 0x41, 0x2e, 0x63, 0x65, 0x9d, 0xd1, 0x70, - 0xb2, 0xcc, 0x40, 0x72, 0x06, 0xab, 0x05, 0x33, 0xf6, 0x02, 0x51, 0xf8, 0xb1, 0xf2, 0x5f, 0x4a, - 0x78, 0xee, 0xa1, 0xf7, 0x0a, 0x3a, 0xc7, 0xa8, 0x50, 0x64, 0x28, 0x52, 0x8e, 0xc6, 0xad, 0x98, - 0x42, 0xa6, 0xa3, 0x3a, 0x60, 0x7f, 0x76, 0x58, 0x86, 0xca, 0xd4, 0x01, 0xfb, 0xb3, 0xab, 0x50, - 0x8d, 0x3f, 0x8c, 0xb9, 0xc6, 0xd2, 0x6d, 0xe9, 0xea, 0xa9, 0x49, 0x03, 0xeb, 0x29, 0xd8, 0xbc, - 0x65, 0x74, 0x93, 0x3d, 0x80, 0xeb, 0xe1, 0x5d, 0x5f, 0x14, 0x20, 0xe4, 0x29, 0x74, 0xb2, 0xe0, - 0x49, 0xfe, 0xda, 0x70, 0xe1, 0x84, 0xef, 0x4d, 0x1a, 0xd4, 0x43, 0x84, 0x55, 0xbf, 0xf1, 0xb8, - 0xc8, 0xc9, 0xf7, 0x70, 0xd7, 0x37, 0x69, 0xb8, 0x75, 0xaf, 0xd7, 0x50, 0x80, 0xee, 0x6c, 0x34, - 0xd1, 0xde, 0x7b, 0x3f, 0xfe, 0xf6, 0xe7, 0xcf, 0xad, 0xfb, 0x64, 0xdb, 0xaf, 0xf2, 0xc9, 0xc1, - 0xc0, 0x58, 0x8d, 0xac, 0xac, 0x7f, 0x48, 0x3e, 0x88, 0x9e, 0x1f, 0xbd, 0xbd, 0xda, 0x8b, 0x7e, - 0xbd, 0xda, 0x8b, 0xfe, 0xb8, 0xda, 0x8b, 0x5e, 0x7d, 0xf8, 0xef, 0xfe, 0x48, 0xd2, 0x82, 0xa3, - 0xb0, 0xb5, 0x93, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0xc3, 0x5b, 0xa5, 0x9e, 0xf1, 0x08, 0x00, - 0x00, -} - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// EventingClient is the client API for Eventing service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type EventingClient interface { - StartEventSource(ctx context.Context, in *EventSource, opts ...grpc.CallOption) (Eventing_StartEventSourceClient, error) -} - -type eventingClient struct { - cc *grpc.ClientConn -} - -func NewEventingClient(cc *grpc.ClientConn) EventingClient { - return &eventingClient{cc} -} - -func (c *eventingClient) StartEventSource(ctx context.Context, in *EventSource, opts ...grpc.CallOption) (Eventing_StartEventSourceClient, error) { - stream, err := c.cc.NewStream(ctx, &_Eventing_serviceDesc.Streams[0], "/generic.Eventing/StartEventSource", opts...) - if err != nil { - return nil, err - } - x := &eventingStartEventSourceClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -type Eventing_StartEventSourceClient interface { - Recv() (*Event, error) - grpc.ClientStream -} - -type eventingStartEventSourceClient struct { - grpc.ClientStream -} - -func (x *eventingStartEventSourceClient) Recv() (*Event, error) { - m := new(Event) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -// EventingServer is the server API for Eventing service. -type EventingServer interface { - StartEventSource(*EventSource, Eventing_StartEventSourceServer) error -} - -// UnimplementedEventingServer can be embedded to have forward compatible implementations. -type UnimplementedEventingServer struct { -} - -func (*UnimplementedEventingServer) StartEventSource(req *EventSource, srv Eventing_StartEventSourceServer) error { - return status.Errorf(codes.Unimplemented, "method StartEventSource not implemented") -} - -func RegisterEventingServer(s *grpc.Server, srv EventingServer) { - s.RegisterService(&_Eventing_serviceDesc, srv) -} - -func _Eventing_StartEventSource_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(EventSource) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(EventingServer).StartEventSource(m, &eventingStartEventSourceServer{stream}) -} - -type Eventing_StartEventSourceServer interface { - Send(*Event) error - grpc.ServerStream -} - -type eventingStartEventSourceServer struct { - grpc.ServerStream -} - -func (x *eventingStartEventSourceServer) Send(m *Event) error { - return x.ServerStream.SendMsg(m) -} - -var _Eventing_serviceDesc = grpc.ServiceDesc{ - ServiceName: "generic.Eventing", - HandlerType: (*EventingServer)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "StartEventSource", - Handler: _Eventing_StartEventSource_Handler, - ServerStreams: true, - }, - }, - Metadata: "server/application/events.proto", + // 958 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xcf, 0x6e, 0xdc, 0xb6, + 0x13, 0xfe, 0x69, 0xfd, 0x7f, 0x76, 0xed, 0xe4, 0x47, 0xdb, 0x29, 0x61, 0x18, 0xce, 0x62, 0x61, + 0x14, 0x8b, 0x20, 0xd1, 0xc2, 0xee, 0x1f, 0x24, 0x41, 0x51, 0xc0, 0x81, 0x5d, 0xd4, 0xad, 0xdd, + 0x16, 0x72, 0x93, 0x43, 0x6e, 0xb4, 0x34, 0xd1, 0x32, 0x2b, 0x91, 0x2c, 0xc9, 0x5d, 0x60, 0xef, + 0x7d, 0xa1, 0xbe, 0x45, 0x8e, 0x7d, 0x82, 0xa2, 0xf0, 0xa5, 0xaf, 0x51, 0x90, 0x92, 0xd6, 0x94, + 0xeb, 0x43, 0xd3, 0x1b, 0xe7, 0x9b, 0x6f, 0x86, 0xfc, 0x38, 0xc3, 0x91, 0xe0, 0xb1, 0x41, 0x3d, + 0x43, 0x3d, 0x62, 0x4a, 0x15, 0x3c, 0x65, 0x96, 0x4b, 0x31, 0xc2, 0x19, 0x0a, 0x6b, 0x62, 0xa5, + 0xa5, 0x95, 0x64, 0x2d, 0x47, 0x81, 0x9a, 0xa7, 0x7b, 0x17, 0x39, 0xb7, 0xe3, 0xe9, 0x75, 0x9c, + 0xca, 0x72, 0xc4, 0x74, 0x2e, 0x95, 0x96, 0xef, 0xfd, 0xe2, 0x59, 0x9a, 0x8d, 0x66, 0xc7, 0x23, + 0x35, 0xc9, 0x47, 0x4c, 0x71, 0xd3, 0x4a, 0x35, 0x3b, 0x62, 0x85, 0x1a, 0xb3, 0xa3, 0x91, 0xcf, + 0xc2, 0x2c, 0x66, 0x55, 0xda, 0xbd, 0xcf, 0x27, 0xcf, 0x4d, 0xcc, 0xa5, 0x8b, 0x28, 0x59, 0x3a, + 0xe6, 0x02, 0xf5, 0xfc, 0x36, 0x45, 0x89, 0x96, 0x8d, 0x66, 0xff, 0x8c, 0xda, 0xc9, 0xa5, 0xdf, + 0xd8, 0xca, 0x91, 0x5b, 0xd5, 0xe8, 0x7e, 0x2e, 0x65, 0x5e, 0xa0, 0x0b, 0x1d, 0x31, 0x21, 0xa4, + 0xf5, 0x7b, 0xd7, 0x02, 0x06, 0x2f, 0xa0, 0x7b, 0xe6, 0x04, 0x5d, 0xc9, 0xa9, 0x4e, 0x91, 0x10, + 0x58, 0x16, 0xac, 0x44, 0x1a, 0xf5, 0x3b, 0xc3, 0x8d, 0xc4, 0xaf, 0xc9, 0x23, 0x58, 0x4d, 0xa5, + 0x78, 0xc7, 0x73, 0xda, 0xe9, 0x47, 0xc3, 0x5e, 0x52, 0x5b, 0x83, 0x2f, 0x60, 0xc5, 0x87, 0xde, + 0x1b, 0x44, 0x61, 0x4d, 0xb1, 0x79, 0x21, 0x59, 0x46, 0x3b, 0xfd, 0xce, 0xb0, 0x97, 0x34, 0xe6, + 0xe0, 0xaf, 0x08, 0x7a, 0x3e, 0xee, 0xa7, 0x0a, 0x20, 0x03, 0xd8, 0xb0, 0xbc, 0x44, 0x63, 0x59, + 0xa9, 0xaa, 0x1c, 0xaf, 0x96, 0x3f, 0xfc, 0xf1, 0xf8, 0x7f, 0xc9, 0x2d, 0xec, 0xce, 0x20, 0xaf, + 0xdf, 0x63, 0x6a, 0xeb, 0x6c, 0xb5, 0x45, 0x9e, 0xc1, 0xaa, 0xf1, 0x27, 0xa7, 0x4b, 0xfd, 0xce, + 0xb0, 0x7b, 0xbc, 0x1b, 0xd7, 0x05, 0x89, 0x7f, 0xf4, 0x84, 0x4a, 0x56, 0x52, 0x93, 0xc8, 0x53, + 0x58, 0x45, 0xad, 0xa5, 0x36, 0x74, 0xb9, 0xbf, 0x34, 0xec, 0x1e, 0xef, 0xdc, 0xa1, 0x9f, 0x39, + 0x67, 0x52, 0x73, 0xc8, 0xd7, 0xd0, 0x65, 0x4a, 0xbd, 0x41, 0x6d, 0xdc, 0x85, 0xd1, 0x95, 0x7e, + 0x34, 0xec, 0x1e, 0xef, 0x2f, 0x42, 0x4e, 0x6e, 0x2b, 0xd9, 0x70, 0x92, 0x30, 0x60, 0xf0, 0xeb, + 0x06, 0xf4, 0xc2, 0x63, 0x90, 0x18, 0x1e, 0x64, 0x68, 0xb8, 0xc6, 0xec, 0x92, 0x09, 0xfe, 0x0e, + 0x8d, 0xa5, 0x51, 0x3f, 0x5a, 0xe8, 0xbd, 0xeb, 0x24, 0x4f, 0x61, 0x8b, 0xa5, 0x76, 0xca, 0x8a, + 0x05, 0xbd, 0x13, 0xd0, 0xef, 0xf8, 0xc8, 0xa7, 0xd0, 0xcd, 0xb9, 0x5d, 0x50, 0x97, 0x02, 0x6a, + 0xe8, 0x20, 0x07, 0xb0, 0xa6, 0x51, 0xc9, 0xd7, 0xc9, 0x05, 0x5d, 0x0e, 0x38, 0x0d, 0x48, 0x28, + 0x2c, 0x2b, 0x66, 0xc7, 0x5e, 0x6f, 0xe3, 0xf4, 0x08, 0xe9, 0xc3, 0xba, 0xc6, 0x19, 0x77, 0xea, + 0xe8, 0x6a, 0xe0, 0x5d, 0xa0, 0xe4, 0x09, 0x6c, 0xa6, 0xb2, 0x2c, 0xb9, 0xbd, 0x44, 0x63, 0x58, + 0x8e, 0x74, 0x2d, 0xa0, 0xb5, 0x5d, 0x64, 0x08, 0xbd, 0x0a, 0x38, 0x99, 0xda, 0xb1, 0xd4, 0x74, + 0x3d, 0xa0, 0xb6, 0x3c, 0xe4, 0x3b, 0x80, 0xca, 0x3e, 0x65, 0x16, 0xe9, 0x86, 0xaf, 0xc3, 0x93, + 0xb8, 0x7a, 0x23, 0x71, 0xf8, 0x46, 0x62, 0x35, 0xc9, 0x1d, 0x60, 0x62, 0xf7, 0x46, 0xe2, 0xd9, + 0x51, 0xfc, 0x33, 0x2f, 0x31, 0x09, 0xa2, 0x9d, 0x7a, 0xa6, 0xd4, 0x0f, 0xae, 0x5f, 0x21, 0x54, + 0x5f, 0x83, 0xe4, 0x5b, 0xd8, 0x60, 0x4a, 0x5d, 0xb0, 0x6b, 0x2c, 0x0c, 0xed, 0xfa, 0x2e, 0x39, + 0xbc, 0xb7, 0xa9, 0x5c, 0xfd, 0x2b, 0xda, 0x99, 0xb0, 0x7a, 0xde, 0xf4, 0xec, 0x22, 0x98, 0x1c, + 0x02, 0x98, 0xb9, 0x48, 0xaf, 0x2c, 0xb3, 0x53, 0x43, 0x7b, 0xc1, 0x66, 0x01, 0x4e, 0xde, 0xc0, + 0x66, 0x6d, 0x69, 0x8b, 0xd9, 0x89, 0xa5, 0x9b, 0x1f, 0x2b, 0xaf, 0xb9, 0xdd, 0x56, 0x1a, 0x92, + 0xc0, 0x96, 0x03, 0xbe, 0xe1, 0x82, 0x9b, 0xb1, 0x4f, 0xbc, 0xf5, 0xd1, 0xf7, 0x76, 0x27, 0x03, + 0x19, 0x40, 0x6f, 0x8c, 0xac, 0xb0, 0xe3, 0x5a, 0xd3, 0x03, 0xa7, 0x29, 0x69, 0x61, 0xe4, 0x10, + 0x36, 0x2b, 0xbb, 0xe9, 0x80, 0x87, 0x9e, 0xd4, 0x06, 0x5d, 0x15, 0xd2, 0x62, 0x6a, 0x2c, 0x6a, + 0xfa, 0xff, 0xb0, 0x0a, 0x35, 0xe8, 0x66, 0xc2, 0x98, 0x1b, 0x2b, 0xf5, 0xfc, 0x3c, 0xa3, 0xa4, + 0x1f, 0x0d, 0x97, 0x9a, 0xfb, 0x5d, 0xc0, 0xe4, 0x25, 0xec, 0x4a, 0xe5, 0x06, 0x20, 0x97, 0xe2, + 0x6a, 0x2e, 0xd2, 0xa4, 0x69, 0xcd, 0xed, 0x20, 0xe3, 0xfd, 0x14, 0xb2, 0x0f, 0xab, 0x4c, 0xa9, + 0xd7, 0xe7, 0xa7, 0x74, 0x27, 0x20, 0xd7, 0x98, 0xeb, 0xcc, 0xba, 0x1d, 0x8c, 0x62, 0x29, 0xd2, + 0xdd, 0xb0, 0x33, 0x43, 0x0f, 0xf9, 0x12, 0xb6, 0x99, 0x52, 0xe7, 0xc2, 0x58, 0x26, 0x52, 0xf4, + 0x85, 0xff, 0x1e, 0xe7, 0xf4, 0x51, 0x10, 0x70, 0x1f, 0xc1, 0xbd, 0x6c, 0xab, 0x59, 0x3a, 0xe1, + 0x22, 0xbf, 0x44, 0x3b, 0x96, 0x19, 0xfd, 0x24, 0x7c, 0xd9, 0x6d, 0xdf, 0xde, 0x57, 0xb0, 0xd5, + 0x6e, 0x36, 0xf2, 0x10, 0x96, 0x26, 0x38, 0xaf, 0xa6, 0x47, 0xe2, 0x96, 0x64, 0x07, 0x56, 0x66, + 0xac, 0x98, 0x62, 0x35, 0x22, 0x92, 0xca, 0x78, 0xd9, 0x79, 0x1e, 0x0d, 0x7e, 0x8b, 0xa0, 0x1b, + 0x8c, 0x37, 0xf7, 0xbe, 0xed, 0x5c, 0x61, 0x6b, 0xf4, 0x78, 0x84, 0xec, 0xc1, 0x4a, 0x81, 0x33, + 0x2c, 0x5a, 0x63, 0xa6, 0x82, 0x5c, 0xc5, 0xca, 0xba, 0xa2, 0xe1, 0x64, 0x69, 0x40, 0x72, 0x01, + 0xeb, 0x05, 0x33, 0xf6, 0x0a, 0x51, 0xf8, 0xb1, 0xf2, 0x5f, 0x5a, 0x78, 0x91, 0x61, 0xf0, 0x16, + 0x7a, 0xa7, 0xa8, 0x50, 0x64, 0x28, 0x52, 0x8e, 0xc6, 0x7d, 0x62, 0x0a, 0x99, 0x4e, 0x6a, 0xc1, + 0x7e, 0xed, 0xb0, 0x0c, 0x95, 0xa9, 0x05, 0xfb, 0xb5, 0xeb, 0x50, 0x8d, 0xbf, 0x4c, 0xb9, 0xc6, + 0xd2, 0x7d, 0xa5, 0xab, 0xa3, 0x26, 0x2d, 0x6c, 0xa0, 0x60, 0xfb, 0x9e, 0xd1, 0x4d, 0x0e, 0x00, + 0x6e, 0x87, 0x77, 0xbd, 0x51, 0x80, 0x90, 0x17, 0xd0, 0xcb, 0x82, 0x23, 0xf9, 0x6d, 0xc3, 0x0f, + 0x4e, 0x78, 0xde, 0xa4, 0x45, 0x7d, 0x75, 0xf2, 0xe1, 0xe6, 0x20, 0xfa, 0xfd, 0xe6, 0x20, 0xfa, + 0xf3, 0xe6, 0x20, 0x7a, 0xfb, 0xd9, 0xbf, 0xfb, 0x55, 0x48, 0x0b, 0x8e, 0xc2, 0xd6, 0xbf, 0x1b, + 0x7f, 0x07, 0x00, 0x00, 0xff, 0xff, 0x57, 0x77, 0xe0, 0x62, 0x8a, 0x08, 0x00, 0x00, } func (m *EventSource) Marshal() (dAtA []byte, err error) { diff --git a/pkg/apiclient/events/events.pb.gw.go b/pkg/apiclient/events/events.pb.gw.go deleted file mode 100644 index 5f4e02c2ae1f3..0000000000000 --- a/pkg/apiclient/events/events.pb.gw.go +++ /dev/null @@ -1,147 +0,0 @@ -// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT. -// source: server/application/events.proto - -/* -Package events is a reverse proxy. - -It translates gRPC into RESTful JSON APIs. -*/ -package events - -import ( - "context" - "io" - "net/http" - - "github.com/golang/protobuf/descriptor" - "github.com/golang/protobuf/proto" - "github.com/grpc-ecosystem/grpc-gateway/runtime" - "github.com/grpc-ecosystem/grpc-gateway/utilities" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" -) - -// Suppress "imported and not used" errors -var _ codes.Code -var _ io.Reader -var _ status.Status -var _ = runtime.String -var _ = utilities.NewDoubleArray -var _ = descriptor.ForMessage -var _ = metadata.Join - -var ( - filter_Eventing_StartEventSource_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)} -) - -func request_Eventing_StartEventSource_0(ctx context.Context, marshaler runtime.Marshaler, client EventingClient, req *http.Request, pathParams map[string]string) (Eventing_StartEventSourceClient, runtime.ServerMetadata, error) { - var protoReq EventSource - var metadata runtime.ServerMetadata - - if err := req.ParseForm(); err != nil { - return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) - } - if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Eventing_StartEventSource_0); err != nil { - return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) - } - - stream, err := client.StartEventSource(ctx, &protoReq) - if err != nil { - return nil, metadata, err - } - header, err := stream.Header() - if err != nil { - return nil, metadata, err - } - metadata.HeaderMD = header - return stream, metadata, nil - -} - -// RegisterEventingHandlerServer registers the http handlers for service Eventing to "mux". -// UnaryRPC :call EventingServer directly. -// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. -// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterEventingHandlerFromEndpoint instead. -func RegisterEventingHandlerServer(ctx context.Context, mux *runtime.ServeMux, server EventingServer) error { - - mux.Handle("GET", pattern_Eventing_StartEventSource_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { - err := status.Error(codes.Unimplemented, "streaming calls are not yet supported in the in-process transport") - _, outboundMarshaler := runtime.MarshalerForRequest(mux, req) - runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) - return - }) - - return nil -} - -// RegisterEventingHandlerFromEndpoint is same as RegisterEventingHandler but -// automatically dials to "endpoint" and closes the connection when "ctx" gets done. -func RegisterEventingHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) { - conn, err := grpc.Dial(endpoint, opts...) - if err != nil { - return err - } - defer func() { - if err != nil { - if cerr := conn.Close(); cerr != nil { - grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) - } - return - } - go func() { - <-ctx.Done() - if cerr := conn.Close(); cerr != nil { - grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) - } - }() - }() - - return RegisterEventingHandler(ctx, mux, conn) -} - -// RegisterEventingHandler registers the http handlers for service Eventing to "mux". -// The handlers forward requests to the grpc endpoint over "conn". -func RegisterEventingHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error { - return RegisterEventingHandlerClient(ctx, mux, NewEventingClient(conn)) -} - -// RegisterEventingHandlerClient registers the http handlers for service Eventing -// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "EventingClient". -// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "EventingClient" -// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in -// "EventingClient" to call the correct interceptors. -func RegisterEventingHandlerClient(ctx context.Context, mux *runtime.ServeMux, client EventingClient) error { - - mux.Handle("GET", pattern_Eventing_StartEventSource_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { - ctx, cancel := context.WithCancel(req.Context()) - defer cancel() - inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) - rctx, err := runtime.AnnotateContext(ctx, mux, req) - if err != nil { - runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) - return - } - resp, md, err := request_Eventing_StartEventSource_0(rctx, inboundMarshaler, client, req, pathParams) - ctx = runtime.NewServerMetadataContext(ctx, md) - if err != nil { - runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) - return - } - - forward_Eventing_StartEventSource_0(ctx, mux, outboundMarshaler, w, req, func() (proto.Message, error) { return resp.Recv() }, mux.GetForwardResponseOptions()...) - - }) - - return nil -} - -var ( - pattern_Eventing_StartEventSource_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"api", "v1", "stream", "events"}, "", runtime.AssumeColonVerbOpt(true))) -) - -var ( - forward_Eventing_StartEventSource_0 = runtime.ForwardResponseStream -) diff --git a/reposerver/repository/repository.go b/reposerver/repository/repository.go index a54a09e8c6e23..14de84b699138 100644 --- a/reposerver/repository/repository.go +++ b/reposerver/repository/repository.go @@ -3031,14 +3031,14 @@ func (s *Service) UpdateRevisionForPaths(_ context.Context, request *apiclient.U if err != nil { // Only warn with the error, no need to block anything if there is a caching error. logCtx.Warnf("error updating cached revision for repo %s with revision %s: %v", repo.Repo, revision, err) - return &apiclient.UpdateRevisionForPathsResponse{Revision: revision,}, nil + return &apiclient.UpdateRevisionForPathsResponse{Revision: revision}, nil } - return &apiclient.UpdateRevisionForPathsResponse{Revision: revision,}, nil + return &apiclient.UpdateRevisionForPathsResponse{Revision: revision}, nil } logCtx.Debugf("changes found for application %s in repo %s from revision %s to revision %s", request.AppName, repo.Repo, syncedRevision, revision) - return &apiclient.UpdateRevisionForPathsResponse{Changes: true, Revision: revision,}, nil + return &apiclient.UpdateRevisionForPathsResponse{Changes: true, Revision: revision}, nil } func (s *Service) updateCachedRevision(logCtx *log.Entry, oldRev string, newRev string, request *apiclient.UpdateRevisionForPathsRequest, gitClientOpts git.ClientOpts) error { diff --git a/reposerver/repository/repository_test.go b/reposerver/repository/repository_test.go index 14456fae1f34f..4f42df723d09e 100644 --- a/reposerver/repository/repository_test.go +++ b/reposerver/repository/repository_test.go @@ -3505,7 +3505,7 @@ func TestUpdateRevisionForPaths(t *testing.T) { Paths: []string{"."}, }, }, want: &apiclient.UpdateRevisionForPathsResponse{ - Revision:"632039659e542ed7de0c170a4fcc1c571b288fc0", + Revision: "632039659e542ed7de0c170a4fcc1c571b288fc0", }, wantErr: assert.NoError}, {name: "ChangedFilesDoNothing", fields: func() fields { s, _, c := newServiceWithOpt(t, func(gitClient *gitmocks.Client, helmClient *helmmocks.Client, paths *iomocks.TempPaths) { @@ -3532,8 +3532,8 @@ func TestUpdateRevisionForPaths(t *testing.T) { Paths: []string{"."}, }, }, want: &apiclient.UpdateRevisionForPathsResponse{ - Changes: true, - Revision:"632039659e542ed7de0c170a4fcc1c571b288fc0", + Changes: true, + Revision: "632039659e542ed7de0c170a4fcc1c571b288fc0", }, wantErr: assert.NoError}, {name: "NoChangesUpdateCache", fields: func() fields { s, _, c := newServiceWithOpt(t, func(gitClient *gitmocks.Client, helmClient *helmmocks.Client, paths *iomocks.TempPaths) { @@ -3567,7 +3567,7 @@ func TestUpdateRevisionForPaths(t *testing.T) { KubeVersion: "v1.16.0", }, }, want: &apiclient.UpdateRevisionForPathsResponse{ - Revision:"632039659e542ed7de0c170a4fcc1c571b288fc0", + Revision: "632039659e542ed7de0c170a4fcc1c571b288fc0", }, wantErr: assert.NoError, cacheHit: &cacheHit{ previousRevision: "1e67a504d03def3a6a1125d934cb511680f72555", revision: "632039659e542ed7de0c170a4fcc1c571b288fc0", @@ -3606,7 +3606,7 @@ func TestUpdateRevisionForPaths(t *testing.T) { HasMultipleSources: true, }, }, want: &apiclient.UpdateRevisionForPathsResponse{ - Revision:"632039659e542ed7de0c170a4fcc1c571b288fc0", + Revision: "632039659e542ed7de0c170a4fcc1c571b288fc0", }, wantErr: assert.NoError, cacheHit: &cacheHit{ previousRevision: "1e67a504d03def3a6a1125d934cb511680f72555", revision: "632039659e542ed7de0c170a4fcc1c571b288fc0", diff --git a/server/application/application.go b/server/application/application.go index e4ca36f46185e..436bb1e64fc35 100644 --- a/server/application/application.go +++ b/server/application/application.go @@ -12,9 +12,6 @@ import ( "strings" "time" - "github.com/argoproj/argo-cd/v2/pkg/apiclient/events" - "gopkg.in/yaml.v2" - kubecache "github.com/argoproj/gitops-engine/pkg/cache" "github.com/argoproj/gitops-engine/pkg/diff" "github.com/argoproj/gitops-engine/pkg/sync/common" @@ -81,30 +78,26 @@ const ( var ( watchAPIBufferSize = env.ParseNumFromEnv(argocommon.EnvWatchAPIBufferSize, 1000, 0, math.MaxInt32) permissionDeniedErr = status.Error(codes.PermissionDenied, "permission denied") - - applicationEventCacheExpiration = time.Minute * time.Duration(env.ParseNumFromEnv(argocommon.EnvApplicationEventCacheDuration, 20, 0, math.MaxInt32)) - resourceEventCacheExpiration = time.Minute * time.Duration(env.ParseNumFromEnv(argocommon.EnvResourceEventCacheDuration, 20, 0, math.MaxInt32)) ) // Server provides an Application service type Server struct { - ns string - kubeclientset kubernetes.Interface - appclientset appclientset.Interface - appLister applisters.ApplicationLister - appInformer cache.SharedIndexInformer - appBroadcaster Broadcaster - repoClientset apiclient.Clientset - kubectl kube.Kubectl - db db.ArgoDB - enf *rbac.Enforcer - projectLock sync.KeyLock - auditLogger *argo.AuditLogger - settingsMgr *settings.SettingsManager - cache *servercache.Cache - projInformer cache.SharedIndexInformer - enabledNamespaces []string - applicationEventReporter *applicationEventReporter + ns string + kubeclientset kubernetes.Interface + appclientset appclientset.Interface + appLister applisters.ApplicationLister + appInformer cache.SharedIndexInformer + appBroadcaster Broadcaster + repoClientset apiclient.Clientset + kubectl kube.Kubectl + db db.ArgoDB + enf *rbac.Enforcer + projectLock sync.KeyLock + auditLogger *argo.AuditLogger + settingsMgr *settings.SettingsManager + cache *servercache.Cache + projInformer cache.SharedIndexInformer + enabledNamespaces []string } // NewServer returns a new instance of the Application service @@ -150,8 +143,6 @@ func NewServer( projInformer: projInformer, enabledNamespaces: enabledNamespaces, } - s.applicationEventReporter = NewApplicationEventReporter(s) - return s, s.getAppResources } @@ -1243,304 +1234,6 @@ func (s *Server) Watch(q *application.ApplicationQuery, ws application.Applicati } } -func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing_StartEventSourceServer) error { - var ( - logCtx log.FieldLogger = log.StandardLogger() - selector labels.Selector - err error - ) - q := application.ApplicationQuery{} - if err := yaml.Unmarshal(es.Config, &q); err != nil { - logCtx.WithError(err).Error("failed to unmarshal event-source config") - return fmt.Errorf("failed to unmarshal event-source config: %w", err) - } - - if q.Name != nil { - logCtx = logCtx.WithField("application", *q.Name) - } - - claims := stream.Context().Value("claims") - - if q.Selector != nil { - selector, err = labels.Parse(*q.Selector) - if err != nil { - return err - } - } - - minVersion := 0 - if q.ResourceVersion != nil { - if minVersion, err = strconv.Atoi(*q.ResourceVersion); err != nil { - minVersion = 0 - } - } - - appNs := s.appNamespaceOrDefault(q.GetAppNamespace()) - - // sendIfPermitted is a helper to send the application to the client's streaming channel if the - // caller has RBAC privileges permissions to view it - sendIfPermitted := func(ctx context.Context, logCtx log.FieldLogger, a appv1.Application, ts string, ignoreResourceCache bool) error { - if appVersion, err := strconv.Atoi(a.ResourceVersion); err == nil && appVersion < minVersion { - return nil - } - - if selector != nil { - matchedEvent := (q.GetName() == "" || a.Name == q.GetName()) && selector.Matches(labels.Set(a.Labels)) - if !matchedEvent { - return nil - } - } - - if !s.isNamespaceEnabled(appNs) { - return security.NamespaceNotPermittedError(appNs) - } - - if !s.enf.Enforce(claims, rbacpolicy.ResourceApplications, rbacpolicy.ActionGet, a.RBACName(s.ns)) { - // do not emit apps user does not have accessing - return nil - } - - appInstanceLabelKey, err := s.settingsMgr.GetAppInstanceLabelKey() - if err != nil { - return err - } - trackingMethod := argoutil.GetTrackingMethod(s.settingsMgr) - - err = s.applicationEventReporter.streamApplicationEvents(ctx, logCtx, &a, es, stream, ts, ignoreResourceCache, appInstanceLabelKey, trackingMethod) - if err != nil { - return err - } - - if err := s.cache.SetLastApplicationEvent(&a, applicationEventCacheExpiration); err != nil { - logCtx.WithError(err).Error("failed to cache last sent application event") - return err - } - - return nil - } - - priorityQueueEnabled := env.ParseBoolFromEnv("CODEFRESH_PRIORITY_QUEUE", false) - - allEventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize) - onUpdateEventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize) - onDeleteEventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize) - onAddEventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize) - - v1ReporterEnabledFilter := func(event *appv1.ApplicationWatchEvent) bool { - if event.Type == watch.Bookmark { - return false // ignore this event - } - - rVersion, _ := s.settingsMgr.GetCodefreshReporterVersion() - if rVersion == string(settings.CodefreshV2ReporterVersion) { - logCtx.Info("v1 reporter disabled skipping event") - return false - } - return true - } - - if priorityQueueEnabled { - unsubscribeOnUpdateChannel := s.appBroadcaster.Subscribe(onUpdateEventsChannel, func(event *appv1.ApplicationWatchEvent) bool { - return event.Type == watch.Modified - }, v1ReporterEnabledFilter) - - unsubscribeOnDeleteChannel := s.appBroadcaster.Subscribe(onDeleteEventsChannel, func(event *appv1.ApplicationWatchEvent) bool { - return event.Type == watch.Deleted - }, v1ReporterEnabledFilter) - - unsubscribeOnAddChannel := s.appBroadcaster.Subscribe(onAddEventsChannel, func(event *appv1.ApplicationWatchEvent) bool { - return event.Type == watch.Added - }, v1ReporterEnabledFilter) - - defer unsubscribeOnUpdateChannel() - defer unsubscribeOnDeleteChannel() - defer unsubscribeOnAddChannel() - } else { - unsubscribeEventsChannel := s.appBroadcaster.Subscribe(allEventsChannel, v1ReporterEnabledFilter) - defer unsubscribeEventsChannel() - } - - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - for { - select { - case event := <-onAddEventsChannel: // active only when CODEFRESH_PRIORITY_QUEUE=true - { - logCtx.Infof("OnAdd channel size is %d", len(onAddEventsChannel)) - logAppEvent := logCtx.WithFields(log.Fields{ - "app": event.Application.Name, - "type": event.Type, - }) - logAppEvent.Infof("Received application added event") - err = s.processEvent(event, logAppEvent, stream, sendIfPermitted) - if err != nil { - return err - } - } - case event := <-onDeleteEventsChannel: // active only when CODEFRESH_PRIORITY_QUEUE=true - { - logCtx.Infof("OnDelete channel size is %d", len(onDeleteEventsChannel)) - logAppEvent := logCtx.WithFields(log.Fields{ - "app": event.Application.Name, - "type": event.Type, - }) - logAppEvent.Infof("Received application deleted event") - err = s.processEvent(event, logAppEvent, stream, sendIfPermitted) - if err != nil { - return err - } - } - case event := <-onUpdateEventsChannel: // active only when CODEFRESH_PRIORITY_QUEUE=true - { - logCtx.Infof("OnUpdate channel size is %d", len(onUpdateEventsChannel)) - logAppEvent := logCtx.WithFields(log.Fields{ - "app": event.Application.Name, - "type": event.Type, - }) - logAppEvent.Infof("Received application update event") - err = s.processEvent(event, logAppEvent, stream, sendIfPermitted) - if err != nil { - return err - } - } - case event := <-allEventsChannel: // active only when CODEFRESH_PRIORITY_QUEUE=false - { - logCtx.Infof("All events channel size is %d", len(allEventsChannel)) - logAppEvent := logCtx.WithFields(log.Fields{ - "app": event.Application.Name, - "type": event.Type, - }) - logAppEvent.Infof("Received application event") - err = s.processEvent(event, logAppEvent, stream, sendIfPermitted) - if err != nil { - return err - } - } - case <-ticker.C: - var err error - ts := time.Now().Format("2006-01-02T15:04:05.000Z") - payload := events.EventPayload{Timestamp: ts} - payloadBytes, err := json.Marshal(&payload) - if err != nil { - log.Errorf("failed to marshal payload for heartbeat: %s", err.Error()) - break - } - - ev := &events.Event{Payload: payloadBytes, Name: es.Name} - if err = stream.Send(ev); err != nil { - log.Errorf("failed to send heartbeat: %s", err.Error()) - break - } - case <-stream.Context().Done(): - return nil - } - } -} - -func (s *Server) processEvent( - event *appv1.ApplicationWatchEvent, - logCtx log.FieldLogger, - stream events.Eventing_StartEventSourceServer, - sendIfPermitted func(ctx context.Context, logCtx log.FieldLogger, a appv1.Application, ts string, ignoreResourceCache bool) error, -) error { - shouldProcess, ignoreResourceCache := s.applicationEventReporter.shouldSendApplicationEvent(event) - if !shouldProcess { - logCtx.Infof("ignore event") - return nil - } - - ts := time.Now().Format("2006-01-02T15:04:05.000Z") - ctx, cancel := context.WithTimeout(stream.Context(), 2*time.Minute) - err := sendIfPermitted(ctx, logCtx, event.Application, ts, ignoreResourceCache) - if err != nil { - logCtx.WithError(err).Error("failed to stream application events") - if strings.Contains(err.Error(), "context deadline exceeded") { - logCtx.Info("Closing event-source connection") - cancel() - return err - } - } - - cancel() - return nil -} - -func (s *Server) ValidateSrcAndDst(ctx context.Context, requset *application.ApplicationValidationRequest) (*application.ApplicationValidateResponse, error) { - app := requset.Application - proj, err := argo.GetAppProject(app, applisters.NewAppProjectLister(s.projInformer.GetIndexer()), s.ns, s.settingsMgr, s.db, ctx) - if err != nil { - entity := projectEntity - if apierr.IsNotFound(err) { - errMsg := fmt.Sprintf("application references project %s which does not exist", app.Spec.Project) - return &application.ApplicationValidateResponse{ - Error: &errMsg, - Entity: &entity, - }, nil - } - errMsg := err.Error() - return &application.ApplicationValidateResponse{ - Error: &errMsg, - Entity: &entity, - }, nil - } - - if err := validateDestination(ctx, &app.Spec.Destination, s.db); err != nil { - entity := destinationEntity - errMsg := fmt.Sprintf("application destination spec for %s is invalid: %s", app.ObjectMeta.Name, err.Error()) - return &application.ApplicationValidateResponse{ - Error: &errMsg, - Entity: &entity, - }, nil - } - var conditions []appv1.ApplicationCondition - conditions, err = argo.ValidateRepo(ctx, app, s.repoClientset, s.db, s.kubectl, proj, s.settingsMgr) - if err != nil { - entity := sourceEntity - errMsg := err.Error() - return &application.ApplicationValidateResponse{ - Error: &errMsg, - Entity: &entity, - }, nil - } - if len(conditions) > 0 { - entity := sourceEntity - errMsg := fmt.Sprintf("application spec for %s is invalid: %s", app.ObjectMeta.Name, argo.FormatAppConditions(conditions)) - return &application.ApplicationValidateResponse{ - Error: &errMsg, - Entity: &entity, - }, nil - } - return &application.ApplicationValidateResponse{ - Error: nil, - Entity: nil, - }, nil -} - -// validates destination name (argo.ValidateDestination) and server with extra logic -func validateDestination(ctx context.Context, dest *appv1.ApplicationDestination, db db.ArgoDB) error { - err := argo.ValidateDestination(ctx, dest, db) - - if err != nil { - return err - } - - if dest.Server != "" { - // Ensure the k8s cluster the app is referencing, is configured in Argo CD - _, err := db.GetCluster(ctx, dest.Server) - if err != nil { - if errStatus, ok := status.FromError(err); ok && errStatus.Code() == codes.NotFound { - return fmt.Errorf("cluster '%s' has not been configured", dest.Server) - } else { - return err - } - } - } else if dest.Server == "" { - return fmt.Errorf("destination server missing from app spec") - } - - return nil -} - func (s *Server) validateAndNormalizeApp(ctx context.Context, app *appv1.Application, proj *appv1.AppProject, validate bool) error { if app.GetName() == "" { return fmt.Errorf("resource name may not be empty") diff --git a/server/application/application_errors_parser.go b/server/application/application_errors_parser.go deleted file mode 100644 index a4da579f57e17..0000000000000 --- a/server/application/application_errors_parser.go +++ /dev/null @@ -1,147 +0,0 @@ -package application - -import ( - "fmt" - "strings" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/argoproj/gitops-engine/pkg/health" - "github.com/argoproj/gitops-engine/pkg/sync/common" - - "github.com/argoproj/argo-cd/v2/pkg/apiclient/events" - appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" -) - -func parseApplicationSyncResultErrors(os *appv1.OperationState) []*events.ObjectError { - var errors []*events.ObjectError - // mean that resource not found as sync result but application can contain error inside operation state itself, - // for example app created with invalid yaml - if os.Phase == common.OperationError || os.Phase == common.OperationFailed { - errors = append(errors, &events.ObjectError{ - Type: "sync", - Level: "error", - Message: os.Message, - LastSeen: os.StartedAt, - }) - } - return errors -} - -var syncTaskUnsuccessfullErrorMessage = "one or more synchronization tasks completed unsuccessfully" -var syncTaskNotValidErrorMessage = "one or more synchronization tasks are not valid" - -func parseApplicationSyncResultErrorsFromConditions(status appv1.ApplicationStatus) []*events.ObjectError { - var errs []*events.ObjectError - if status.Conditions == nil { - return errs - } - for _, cnd := range status.Conditions { - if !strings.Contains(strings.ToLower(cnd.Type), "error") { - continue - } - - lastSeen := metav1.Now() - if cnd.LastTransitionTime != nil { - lastSeen = *cnd.LastTransitionTime - } - - if (strings.Contains(cnd.Message, syncTaskUnsuccessfullErrorMessage) || strings.Contains(cnd.Message, syncTaskNotValidErrorMessage)) && status.OperationState != nil && status.OperationState.SyncResult != nil && status.OperationState.SyncResult.Resources != nil { - resourcesSyncErrors := parseAggregativeResourcesSyncErrors(status.OperationState.SyncResult.Resources) - - errs = append(errs, resourcesSyncErrors...) - } else { - errs = append(errs, &events.ObjectError{ - Type: "sync", - Level: "error", - Message: cnd.Message, - LastSeen: lastSeen, - }) - } - } - return errs -} - -func parseResourceSyncResultErrors(rs *appv1.ResourceStatus, os *appv1.OperationState) []*events.ObjectError { - errors := []*events.ObjectError{} - if os.SyncResult == nil { - return errors - } - - _, sr := os.SyncResult.Resources.Find( - rs.Group, - rs.Kind, - rs.Namespace, - rs.Name, - common.SyncPhaseSync, - ) - - if sr == nil || !(sr.HookPhase == common.OperationFailed || sr.HookPhase == common.OperationError || sr.Status == common.ResultCodeSyncFailed) { - return errors - } - - errors = append(errors, &events.ObjectError{ - Type: "sync", - Level: "error", - Message: sr.Message, - LastSeen: os.StartedAt, - }) - - return errors -} - -func parseAggregativeHealthErrors(rs *appv1.ResourceStatus, apptree *appv1.ApplicationTree) []*events.ObjectError { - errs := make([]*events.ObjectError, 0) - - if apptree == nil { - return errs - } - - n := apptree.FindNode(rs.Group, rs.Kind, rs.Namespace, rs.Name) - if n == nil { - return errs - } - - childNodes := n.GetAllChildNodes(apptree, "") - - for _, cn := range childNodes { - if cn.Health != nil && cn.Health.Status == health.HealthStatusDegraded { - errs = append(errs, &events.ObjectError{ - Type: "health", - Level: "error", - Message: cn.Health.Message, - LastSeen: *cn.CreatedAt, - }) - } - } - - return errs -} - -func parseAggregativeResourcesSyncErrors(resourceResults appv1.ResourceResults) []*events.ObjectError { - var errs []*events.ObjectError - - if resourceResults == nil { - return errs - } - - for _, rr := range resourceResults { - if rr.Message != "" { - objectError := events.ObjectError{ - Type: "sync", - Level: "error", - LastSeen: metav1.Now(), - Message: fmt.Sprintf("Resource %s(%s): \n %s", rr.Kind, rr.Name, rr.Message), - } - if rr.Status == common.ResultCodeSyncFailed { - errs = append(errs, &objectError) - } - if rr.HookPhase == common.OperationFailed || rr.HookPhase == common.OperationError { - errs = append(errs, &objectError) - } - - } - } - - return errs -} diff --git a/server/application/application_event_reporter.go b/server/application/application_event_reporter.go deleted file mode 100644 index 9371316b45f1d..0000000000000 --- a/server/application/application_event_reporter.go +++ /dev/null @@ -1,811 +0,0 @@ -package application - -import ( - "context" - "encoding/json" - "fmt" - "reflect" - "strings" - - "github.com/argoproj/argo-cd/v2/util/argo" - - "github.com/argoproj/gitops-engine/pkg/health" - "github.com/argoproj/gitops-engine/pkg/utils/kube" - "github.com/argoproj/gitops-engine/pkg/utils/text" - log "github.com/sirupsen/logrus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/watch" - "sigs.k8s.io/yaml" - - "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" - "github.com/argoproj/argo-cd/v2/pkg/apiclient/events" - appv1reg "github.com/argoproj/argo-cd/v2/pkg/apis/application" - appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" - "github.com/argoproj/argo-cd/v2/reposerver/apiclient" -) - -type applicationEventReporter struct { - server *Server -} - -func NewApplicationEventReporter(server *Server) *applicationEventReporter { - return &applicationEventReporter{server} -} - -func (s *applicationEventReporter) shouldSendResourceEvent(a *appv1.Application, rs appv1.ResourceStatus) bool { - logCtx := logWithResourceStatus(log.WithFields(log.Fields{ - "app": a.Name, - "gvk": fmt.Sprintf("%s/%s/%s", rs.Group, rs.Version, rs.Kind), - "resource": fmt.Sprintf("%s/%s", rs.Namespace, rs.Name), - }), rs) - - cachedRes, err := s.server.cache.GetLastResourceEvent(a, rs, getApplicationLatestRevision(a)) - if err != nil { - logCtx.Debug("resource not in cache") - return true - } - - if reflect.DeepEqual(&cachedRes, &rs) { - logCtx.Debug("resource status not changed") - - // status not changed - return false - } - - logCtx.Info("resource status changed") - return true -} - -func getParentAppName(a *appv1.Application, appInstanceLabelKey string, trackingMethod appv1.TrackingMethod) string { - resourceTracking := argo.NewResourceTracking() - unApp := kube.MustToUnstructured(&a) - - return resourceTracking.GetAppName(unApp, appInstanceLabelKey, trackingMethod) -} - -func isChildApp(parentAppName string) bool { - return parentAppName != "" -} - -func getAppAsResource(a *appv1.Application) *appv1.ResourceStatus { - return &appv1.ResourceStatus{ - Name: a.Name, - Namespace: a.Namespace, - Version: "v1alpha1", - Kind: "Application", - Group: "argoproj.io", - Status: a.Status.Sync.Status, - Health: &a.Status.Health, - RequiresPruning: a.DeletionTimestamp != nil, - } -} - -func (s *applicationEventReporter) getDesiredManifests(ctx context.Context, a *appv1.Application, logCtx log.FieldLogger) (*apiclient.ManifestResponse, error, bool) { - // get the desired state manifests of the application - desiredManifests, err := s.server.GetManifests(ctx, &application.ApplicationManifestQuery{ - Name: &a.Name, - Revision: &a.Status.Sync.Revision, - }) - if err != nil { - // if it's manifest generation error we need to still report the actual state - // of the resources, but since we can't get the desired state, we will report - // each resource with empty desired state - logCtx.WithError(err).Warn("failed to get application desired state manifests, reporting actual state only") - desiredManifests = &apiclient.ManifestResponse{Manifests: []*apiclient.Manifest{}} - return desiredManifests, nil, true // will ignore requiresPruning=true to not delete resources with actual state - } - return desiredManifests, nil, false -} - -func (s *applicationEventReporter) streamApplicationEvents( - ctx context.Context, - logCtx log.FieldLogger, - a *appv1.Application, - es *events.EventSource, - stream events.Eventing_StartEventSourceServer, - ts string, - ignoreResourceCache bool, - appInstanceLabelKey string, - trackingMethod appv1.TrackingMethod, -) error { - - logCtx.WithField("ignoreResourceCache", ignoreResourceCache).Info("streaming application events") - - appTree, err := s.server.getAppResources(ctx, a) - if err != nil { - if strings.Contains(err.Error(), "context deadline exceeded") { - return fmt.Errorf("failed to get application tree: %w", err) - } - - // we still need process app even without tree, it is in case of app yaml originally contain error, - // we still want to show it the erorrs that related to it on codefresh ui - logCtx.WithError(err).Warn("failed to get application tree, resuming") - } - - desiredManifests, err, manifestGenErr := s.getDesiredManifests(ctx, a, logCtx) - if err != nil { - return err - } - - parentAppName := getParentAppName(a, appInstanceLabelKey, trackingMethod) - - if isChildApp(parentAppName) { - parentApplicationEntity, err := s.server.Get(ctx, &application.ApplicationQuery{ - Name: &parentAppName, - }) - if err != nil { - return fmt.Errorf("failed to get parent application entity: %w", err) - } - - rs := getAppAsResource(a) - - parentDesiredManifests, err, manifestGenErr := s.getDesiredManifests(ctx, parentApplicationEntity, logCtx) - if err != nil { - logCtx.WithError(err).Warn("failed to get parent application's desired manifests, resuming") - } - - // helm app hasnt revision - // TODO: add check if it helm application - parentOperationRevision := getOperationRevision(parentApplicationEntity) - parentRevisionMetadata, err := s.getApplicationRevisionDetails(ctx, parentApplicationEntity, parentOperationRevision) - if err != nil { - logCtx.WithError(err).Warn("failed to get parent application's revision metadata, resuming") - } - - err = s.processResource(ctx, *rs, parentApplicationEntity, logCtx, ts, parentDesiredManifests, stream, appTree, es, manifestGenErr, a, parentRevisionMetadata, true, appInstanceLabelKey, trackingMethod, desiredManifests.ApplicationVersions) - if err != nil { - return err - } - } else { - // will get here only for root applications (not managed as a resource by another application) - appEvent, err := s.getApplicationEventPayload(ctx, a, es, ts, appInstanceLabelKey, trackingMethod, desiredManifests.ApplicationVersions) - if err != nil { - return fmt.Errorf("failed to get application event: %w", err) - } - - if appEvent == nil { - // event did not have an OperationState - skip all events - return nil - } - - logWithAppStatus(a, logCtx, ts).Info("sending root application event") - if err := stream.Send(appEvent); err != nil { - return fmt.Errorf("failed to send event for root application %s/%s: %w", a.Namespace, a.Name, err) - } - } - - revisionMetadata, _ := s.getApplicationRevisionDetails(ctx, a, getOperationRevision(a)) - // for each resource in the application get desired and actual state, - // then stream the event - for _, rs := range a.Status.Resources { - if isApp(rs) { - continue - } - - err := s.processResource(ctx, rs, a, logCtx, ts, desiredManifests, stream, appTree, es, manifestGenErr, nil, revisionMetadata, ignoreResourceCache, appInstanceLabelKey, trackingMethod, nil) - if err != nil { - return err - } - } - return nil -} - -func (s *applicationEventReporter) getAppForResourceReporting( - rs appv1.ResourceStatus, - ctx context.Context, - a *appv1.Application, - revisionMetadata *appv1.RevisionMetadata, -) (*appv1.Application, *appv1.RevisionMetadata) { - if rs.Kind != "Rollout" { // for rollout it's crucial to report always correct operationSyncRevision - return a, revisionMetadata - } - - latestAppStatus, err := s.server.appLister.Applications(a.Namespace).Get(a.Name) - - if err != nil { - return a, revisionMetadata - } - - revisionMetadataToReport, err := s.getApplicationRevisionDetails(ctx, latestAppStatus, getOperationRevision(latestAppStatus)) - - if err != nil { - return a, revisionMetadata - } - - return latestAppStatus, revisionMetadataToReport -} - -func (s *applicationEventReporter) processResource( - ctx context.Context, - rs appv1.ResourceStatus, - parentApplication *appv1.Application, - logCtx log.FieldLogger, - ts string, - desiredManifests *apiclient.ManifestResponse, - stream events.Eventing_StartEventSourceServer, - appTree *appv1.ApplicationTree, - es *events.EventSource, - manifestGenErr bool, - originalApplication *appv1.Application, - revisionMetadata *appv1.RevisionMetadata, - ignoreResourceCache bool, - appInstanceLabelKey string, - trackingMethod appv1.TrackingMethod, - applicationVersions *apiclient.ApplicationVersions, -) error { - logCtx = logCtx.WithFields(log.Fields{ - "gvk": fmt.Sprintf("%s/%s/%s", rs.Group, rs.Version, rs.Kind), - "resource": fmt.Sprintf("%s/%s", rs.Namespace, rs.Name), - }) - - if rs.Health == nil && rs.Status == appv1.SyncStatusCodeSynced { - // for resources without health status we need to add 'Healthy' status - // when they are synced because we might have sent an event with 'Missing' - // status earlier and they would be stuck in it if we don't switch to 'Healthy' - rs.Health = &appv1.HealthStatus{ - Status: health.HealthStatusHealthy, - } - } - - if !ignoreResourceCache && !s.shouldSendResourceEvent(parentApplication, rs) { - return nil - } - - // get resource desired state - desiredState := getResourceDesiredState(&rs, desiredManifests, logCtx) - - // get resource actual state - actualState, err := s.server.GetResource(ctx, &application.ApplicationResourceRequest{ - Name: &parentApplication.Name, - Namespace: &rs.Namespace, - ResourceName: &rs.Name, - Version: &rs.Version, - Group: &rs.Group, - Kind: &rs.Kind, - }) - if err != nil { - if !strings.Contains(err.Error(), "not found") { - // only return error if there is no point in trying to send the - // next resource. For example if the shared context has exceeded - // its deadline - if strings.Contains(err.Error(), "context deadline exceeded") { - return fmt.Errorf("failed to get actual state: %w", err) - } - - logCtx.WithError(err).Warn("failed to get actual state, resuming") - return nil - } - - manifest := "" - // empty actual state - actualState = &application.ApplicationResourceResponse{Manifest: &manifest} - } - - parentApplicationToReport, revisionMetadataToReport := s.getAppForResourceReporting(rs, ctx, parentApplication, revisionMetadata) - - var originalAppRevisionMetadata *appv1.RevisionMetadata = nil - - if originalApplication != nil { - originalAppRevisionMetadata, _ = s.getApplicationRevisionDetails(ctx, originalApplication, getOperationRevision(originalApplication)) - } - - ev, err := getResourceEventPayload(parentApplicationToReport, &rs, es, actualState, desiredState, appTree, manifestGenErr, ts, originalApplication, revisionMetadataToReport, originalAppRevisionMetadata, appInstanceLabelKey, trackingMethod, applicationVersions) - if err != nil { - logCtx.WithError(err).Warn("failed to get event payload, resuming") - return nil - } - - appRes := appv1.Application{} - if isApp(rs) && actualState.Manifest != nil && json.Unmarshal([]byte(*actualState.Manifest), &appRes) == nil { - logWithAppStatus(&appRes, logCtx, ts).Info("streaming resource event") - } else { - logWithResourceStatus(logCtx, rs).Info("streaming resource event") - } - - if err := stream.Send(ev); err != nil { - if strings.Contains(err.Error(), "context deadline exceeded") { - return fmt.Errorf("failed to send resource event: %w", err) - } - - logCtx.WithError(err).Warn("failed to send resource event, resuming") - return nil - } - - if err := s.server.cache.SetLastResourceEvent(parentApplicationToReport, rs, resourceEventCacheExpiration, getApplicationLatestRevision(parentApplicationToReport)); err != nil { - logCtx.WithError(err).Warn("failed to cache resource event") - } - - return nil -} - -func (s *applicationEventReporter) shouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool) { - logCtx := log.WithField("app", ae.Application.Name) - - if ae.Type == watch.Deleted { - logCtx.Info("application deleted") - return true, false - } - - cachedApp, err := s.server.cache.GetLastApplicationEvent(&ae.Application) - if err != nil || cachedApp == nil { - return true, false - } - - cachedApp.Status.ReconciledAt = ae.Application.Status.ReconciledAt // ignore those in the diff - cachedApp.Spec.Project = ae.Application.Spec.Project // - for i := range cachedApp.Status.Conditions { - cachedApp.Status.Conditions[i].LastTransitionTime = nil - } - for i := range ae.Application.Status.Conditions { - ae.Application.Status.Conditions[i].LastTransitionTime = nil - } - - // check if application changed to healthy status - if ae.Application.Status.Health.Status == health.HealthStatusHealthy && cachedApp.Status.Health.Status != health.HealthStatusHealthy { - return true, true - } - - if !reflect.DeepEqual(ae.Application.Spec, cachedApp.Spec) { - logCtx.Info("application spec changed") - return true, false - } - - if !reflect.DeepEqual(ae.Application.Status, cachedApp.Status) { - logCtx.Info("application status changed") - return true, false - } - - if !reflect.DeepEqual(ae.Application.Operation, cachedApp.Operation) { - logCtx.Info("application operation changed") - return true, false - } - - return false, false -} - -func isApp(rs appv1.ResourceStatus) bool { - return rs.GroupVersionKind().String() == appv1.ApplicationSchemaGroupVersionKind.String() -} - -func logWithAppStatus(a *appv1.Application, logCtx log.FieldLogger, ts string) *log.Entry { - return logCtx.WithFields(log.Fields{ - "sync": a.Status.Sync.Status, - "health": a.Status.Health.Status, - "resourceVersion": a.ResourceVersion, - "ts": ts, - }) -} - -func logWithResourceStatus(logCtx log.FieldLogger, rs appv1.ResourceStatus) log.FieldLogger { - logCtx = logCtx.WithField("sync", rs.Status) - if rs.Health != nil { - logCtx = logCtx.WithField("health", rs.Health.Status) - } - - return logCtx -} - -func getLatestAppHistoryItem(a *appv1.Application) *appv1.RevisionHistory { - if a.Status.History != nil && len(a.Status.History) > 0 { - return &a.Status.History[len(a.Status.History)-1] - } - - return nil -} - -func getApplicationLatestRevision(a *appv1.Application) string { - revision := a.Status.Sync.Revision - lastHistory := getLatestAppHistoryItem(a) - - if lastHistory != nil { - revision = lastHistory.Revision - } - - return revision -} - -func getOperationRevision(a *appv1.Application) string { - var revision string - if a != nil { - // this value will be used in case if application hasnt resources , like gitsource - revision = a.Status.Sync.Revision - if a.Status.OperationState != nil && a.Status.OperationState.Operation.Sync != nil && a.Status.OperationState.Operation.Sync.Revision != "" { - revision = a.Status.OperationState.Operation.Sync.Revision - } else if a.Operation != nil && a.Operation.Sync != nil && a.Operation.Sync.Revision != "" { - revision = a.Operation.Sync.Revision - } - } - - return revision -} - -func (s *applicationEventReporter) getApplicationRevisionDetails(ctx context.Context, a *appv1.Application, revision string) (*appv1.RevisionMetadata, error) { - name := a.GetName() - namespace := a.GetNamespace() - project := a.Spec.GetProject() - return s.server.RevisionMetadata(ctx, &application.RevisionMetadataQuery{ - Name: &name, - AppNamespace: &namespace, - Project: &project, - Revision: &revision, - }) -} - -func getLatestAppHistoryId(a *appv1.Application) int64 { - var id int64 - lastHistory := getLatestAppHistoryItem(a) - - if lastHistory != nil { - id = lastHistory.ID - } - - return id -} - -func getResourceEventPayload( - parentApplication *appv1.Application, - rs *appv1.ResourceStatus, - es *events.EventSource, - actualState *application.ApplicationResourceResponse, - desiredState *apiclient.Manifest, - apptree *appv1.ApplicationTree, - manifestGenErr bool, - ts string, - originalApplication *appv1.Application, // passed when rs is application - revisionMetadata *appv1.RevisionMetadata, - originalAppRevisionMetadata *appv1.RevisionMetadata, // passed when rs is application - appInstanceLabelKey string, - trackingMethod appv1.TrackingMethod, - applicationVersions *apiclient.ApplicationVersions, -) (*events.Event, error) { - var ( - err error - syncStarted = metav1.Now() - syncFinished *metav1.Time - errors = []*events.ObjectError{} - logCtx *log.Entry - ) - - if originalApplication != nil { - logCtx = log.WithField("application", originalApplication.Name) - } else { - logCtx = log.NewEntry(log.StandardLogger()) - } - - object := []byte(*actualState.Manifest) - - if originalAppRevisionMetadata != nil && len(object) != 0 { - actualObject, err := appv1.UnmarshalToUnstructured(*actualState.Manifest) - - if err == nil { - actualObject = addCommitDetailsToLabels(actualObject, originalAppRevisionMetadata) - object, err = actualObject.MarshalJSON() - if err != nil { - return nil, fmt.Errorf("failed to marshal unstructured object: %w", err) - } - } - } - if len(object) == 0 { - if len(desiredState.CompiledManifest) == 0 { - // no actual or desired state, don't send event - u := &unstructured.Unstructured{} - apiVersion := rs.Version - if rs.Group != "" { - apiVersion = rs.Group + "/" + rs.Version - } - - u.SetAPIVersion(apiVersion) - u.SetKind(rs.Kind) - u.SetName(rs.Name) - u.SetNamespace(rs.Namespace) - if originalAppRevisionMetadata != nil { - u = addCommitDetailsToLabels(u, originalAppRevisionMetadata) - } - - object, err = u.MarshalJSON() - if err != nil { - return nil, fmt.Errorf("failed to marshal unstructured object: %w", err) - } - } else { - // no actual state, use desired state as event object - unstructuredWithNamespace, err := addDestNamespaceToManifest([]byte(desiredState.CompiledManifest), rs) - if err != nil { - return nil, fmt.Errorf("failed to add destination namespace to manifest: %w", err) - } - if originalAppRevisionMetadata != nil { - unstructuredWithNamespace = addCommitDetailsToLabels(unstructuredWithNamespace, originalAppRevisionMetadata) - } - - object, _ = unstructuredWithNamespace.MarshalJSON() - } - } else if rs.RequiresPruning && !manifestGenErr { - // resource should be deleted - desiredState.CompiledManifest = "" - manifest := "" - actualState.Manifest = &manifest - } - - if (originalApplication != nil && originalApplication.DeletionTimestamp != nil) || parentApplication.ObjectMeta.DeletionTimestamp != nil { - // resource should be deleted in case if application in process of deletion - desiredState.CompiledManifest = "" - manifest := "" - actualState.Manifest = &manifest - } - - if parentApplication.Status.OperationState != nil { - syncStarted = parentApplication.Status.OperationState.StartedAt - syncFinished = parentApplication.Status.OperationState.FinishedAt - errors = append(errors, parseResourceSyncResultErrors(rs, parentApplication.Status.OperationState)...) - } - - // for primitive resources that are synced right away and don't require progression time (like configmap) - if rs.Status == appv1.SyncStatusCodeSynced && rs.Health != nil && rs.Health.Status == health.HealthStatusHealthy { - syncFinished = &syncStarted - } - - // parent application not include errors in application originally was created with broken state, for example in destination missed namespace - if originalApplication != nil && originalApplication.Status.OperationState != nil { - errors = append(errors, parseApplicationSyncResultErrors(originalApplication.Status.OperationState)...) - } - - if originalApplication != nil && originalApplication.Status.Conditions != nil { - errors = append(errors, parseApplicationSyncResultErrorsFromConditions(originalApplication.Status)...) - } - - if len(desiredState.RawManifest) == 0 && len(desiredState.CompiledManifest) != 0 { - // for handling helm defined resources, etc... - y, err := yaml.JSONToYAML([]byte(desiredState.CompiledManifest)) - if err == nil { - desiredState.RawManifest = string(y) - } - } - - applicationVersionsEvents, err := repoAppVersionsToEvent(applicationVersions) - if err != nil { - logCtx.Errorf("failed to convert appVersions: %v", err) - } - - source := events.ObjectSource{ - DesiredManifest: desiredState.CompiledManifest, - ActualManifest: *actualState.Manifest, - GitManifest: desiredState.RawManifest, - RepoURL: parentApplication.Status.Sync.ComparedTo.Source.RepoURL, - Path: desiredState.Path, - Revision: getApplicationLatestRevision(parentApplication), - OperationSyncRevision: getOperationRevision(parentApplication), - HistoryId: getLatestAppHistoryId(parentApplication), - AppName: parentApplication.Name, - AppNamespace: parentApplication.Namespace, - AppUID: string(parentApplication.ObjectMeta.UID), - AppLabels: parentApplication.Labels, - SyncStatus: string(rs.Status), - SyncStartedAt: syncStarted, - SyncFinishedAt: syncFinished, - Cluster: parentApplication.Spec.Destination.Server, - AppInstanceLabelKey: appInstanceLabelKey, - TrackingMethod: string(trackingMethod), - } - - if revisionMetadata != nil { - source.CommitMessage = revisionMetadata.Message - source.CommitAuthor = revisionMetadata.Author - source.CommitDate = &revisionMetadata.Date - } - - if rs.Health != nil { - source.HealthStatus = (*string)(&rs.Health.Status) - source.HealthMessage = &rs.Health.Message - if rs.Health.Status != health.HealthStatusHealthy { - errors = append(errors, parseAggregativeHealthErrors(rs, apptree)...) - } - } - - payload := events.EventPayload{ - Timestamp: ts, - Object: object, - Source: &source, - Errors: errors, - AppVersions: applicationVersionsEvents, - } - - logCtx.Infof("AppVersion before encoding: %v", safeString(payload.AppVersions.AppVersion)) - - payloadBytes, err := json.Marshal(&payload) - if err != nil { - return nil, fmt.Errorf("failed to marshal payload for resource %s/%s: %w", rs.Namespace, rs.Name, err) - } - - return &events.Event{Payload: payloadBytes, Name: es.Name}, nil -} - -func (s *applicationEventReporter) getApplicationEventPayload( - ctx context.Context, - a *appv1.Application, - es *events.EventSource, - ts string, - appInstanceLabelKey string, - trackingMethod appv1.TrackingMethod, - applicationVersions *apiclient.ApplicationVersions, -) (*events.Event, error) { - var ( - syncStarted = metav1.Now() - syncFinished *metav1.Time - logCtx = log.WithField("application", a.Name) - ) - - obj := appv1.Application{} - a.DeepCopyInto(&obj) - - // make sure there is type meta on object - obj.TypeMeta = metav1.TypeMeta{ - Kind: appv1reg.ApplicationKind, - APIVersion: appv1.SchemeGroupVersion.String(), - } - - if a.Status.OperationState != nil { - syncStarted = a.Status.OperationState.StartedAt - syncFinished = a.Status.OperationState.FinishedAt - } - - applicationSource := a.Spec.GetSource() - if !applicationSource.IsHelm() && (a.Status.Sync.Revision != "" || (a.Status.History != nil && len(a.Status.History) > 0)) { - revisionMetadata, err := s.getApplicationRevisionDetails(ctx, a, getOperationRevision(a)) - - if err != nil { - if !strings.Contains(err.Error(), "not found") { - return nil, fmt.Errorf("failed to get revision metadata: %w", err) - } - - logCtx.Warnf("failed to get revision metadata: %s, reporting application deletion event", err.Error()) - } else { - if obj.ObjectMeta.Labels == nil { - obj.ObjectMeta.Labels = map[string]string{} - } - - obj.ObjectMeta.Labels["app.meta.commit-date"] = revisionMetadata.Date.Format("2006-01-02T15:04:05.000Z") - obj.ObjectMeta.Labels["app.meta.commit-author"] = revisionMetadata.Author - obj.ObjectMeta.Labels["app.meta.commit-message"] = revisionMetadata.Message - } - } - - object, err := json.Marshal(&obj) - if err != nil { - return nil, fmt.Errorf("failed to marshal application event") - } - - actualManifest := string(object) - if a.DeletionTimestamp != nil { - actualManifest = "" // mark as deleted - logCtx.Info("reporting application deletion event") - } - - applicationVersionsEvents, err := repoAppVersionsToEvent(applicationVersions) - if err != nil { - logCtx.Errorf("failed to convert appVersions: %v", err) - } - - hs := string(a.Status.Health.Status) - source := &events.ObjectSource{ - DesiredManifest: "", - GitManifest: "", - ActualManifest: actualManifest, - RepoURL: a.Spec.GetSource().RepoURL, - CommitMessage: "", - CommitAuthor: "", - Path: "", - Revision: "", - OperationSyncRevision: "", - HistoryId: 0, - AppName: "", - AppUID: "", - AppLabels: map[string]string{}, - SyncStatus: string(a.Status.Sync.Status), - SyncStartedAt: syncStarted, - SyncFinishedAt: syncFinished, - HealthStatus: &hs, - HealthMessage: &a.Status.Health.Message, - Cluster: a.Spec.Destination.Server, - AppInstanceLabelKey: appInstanceLabelKey, - TrackingMethod: string(trackingMethod), - } - - payload := events.EventPayload{ - Timestamp: ts, - Object: object, - Source: source, - Errors: parseApplicationSyncResultErrorsFromConditions(a.Status), - AppVersions: applicationVersionsEvents, - } - - logCtx.Infof("AppVersion before encoding: %v", safeString(payload.AppVersions.AppVersion)) - - payloadBytes, err := json.Marshal(&payload) - if err != nil { - return nil, fmt.Errorf("failed to marshal payload for resource %s/%s: %w", a.Namespace, a.Name, err) - } - - return &events.Event{Payload: payloadBytes, Name: es.Name}, nil -} - -func getResourceDesiredState(rs *appv1.ResourceStatus, ds *apiclient.ManifestResponse, logger log.FieldLogger) *apiclient.Manifest { - if ds == nil { - return &apiclient.Manifest{} - } - for _, m := range ds.Manifests { - u, err := appv1.UnmarshalToUnstructured(m.CompiledManifest) - if err != nil { - logger.WithError(err).Warnf("failed to unmarshal compiled manifest") - continue - } - - if u == nil { - continue - } - - ns := text.FirstNonEmpty(u.GetNamespace(), rs.Namespace) - - if u.GroupVersionKind().String() == rs.GroupVersionKind().String() && - u.GetName() == rs.Name && - ns == rs.Namespace { - if rs.Kind == kube.SecretKind && rs.Version == "v1" { - m.RawManifest = m.CompiledManifest - } - - return m - } - } - - // no desired state for resource - // it's probably deleted from git - return &apiclient.Manifest{} -} - -func addDestNamespaceToManifest(resourceManifest []byte, rs *appv1.ResourceStatus) (*unstructured.Unstructured, error) { - u, err := appv1.UnmarshalToUnstructured(string(resourceManifest)) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal manifest: %w", err) - } - - if u.GetNamespace() == rs.Namespace { - return u, nil - } - - // need to change namespace - u.SetNamespace(rs.Namespace) - - return u, nil -} - -func addCommitDetailsToLabels(u *unstructured.Unstructured, revisionMetadata *appv1.RevisionMetadata) *unstructured.Unstructured { - if revisionMetadata == nil || u == nil { - return u - } - - if field, _, _ := unstructured.NestedFieldCopy(u.Object, "metadata", "labels"); field == nil { - _ = unstructured.SetNestedStringMap(u.Object, map[string]string{}, "metadata", "labels") - } - - _ = unstructured.SetNestedField(u.Object, revisionMetadata.Date.Format("2006-01-02T15:04:05.000Z"), "metadata", "labels", "app.meta.commit-date") - _ = unstructured.SetNestedField(u.Object, revisionMetadata.Author, "metadata", "labels", "app.meta.commit-author") - _ = unstructured.SetNestedField(u.Object, revisionMetadata.Message, "metadata", "labels", "app.meta.commit-message") - - return u -} - -func repoAppVersionsToEvent(applicationVersions *apiclient.ApplicationVersions) (*events.ApplicationVersions, error) { - applicationVersionsEvents := &events.ApplicationVersions{} - applicationVersionsData, _ := json.Marshal(applicationVersions) - err := json.Unmarshal(applicationVersionsData, applicationVersionsEvents) - if err != nil { - return nil, err - } - return applicationVersionsEvents, nil -} - -func safeString(s *string) string { - if s == nil { - return "" - } - return *s -} diff --git a/server/application/application_event_reporter_test.go b/server/application/application_event_reporter_test.go deleted file mode 100644 index 756b1d350f1a9..0000000000000 --- a/server/application/application_event_reporter_test.go +++ /dev/null @@ -1,452 +0,0 @@ -package application - -import ( - "context" - "encoding/json" - "testing" - "time" - - "github.com/ghodss/yaml" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - - log "github.com/sirupsen/logrus" - "google.golang.org/grpc" - "k8s.io/apimachinery/pkg/runtime" - - appsv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" - fakeapps "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned/fake" - appinformer "github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions" - applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" - - "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/tools/cache" - - apps "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned/fake" - appinformers "github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions/application/v1alpha1" - servercache "github.com/argoproj/argo-cd/v2/server/cache" - "github.com/argoproj/argo-cd/v2/test" - cacheutil "github.com/argoproj/argo-cd/v2/util/cache" - appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate" - "github.com/argoproj/argo-cd/v2/util/rbac" - - "github.com/stretchr/testify/assert" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/argoproj/argo-cd/v2/common" - "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" - "github.com/argoproj/argo-cd/v2/pkg/apiclient/events" - "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" - "github.com/argoproj/argo-cd/v2/reposerver/apiclient" - "github.com/argoproj/argo-cd/v2/util/argo" -) - -func TestGetResourceEventPayload(t *testing.T) { - t.Run("Deleting timestamp is empty", func(t *testing.T) { - - app := v1alpha1.Application{} - rs := v1alpha1.ResourceStatus{} - es := events.EventSource{} - - man := "{ \"key\" : \"manifest\" }" - - actualState := application.ApplicationResourceResponse{ - Manifest: &man, - } - desiredState := apiclient.Manifest{ - CompiledManifest: "{ \"key\" : \"manifest\" }", - } - appTree := v1alpha1.ApplicationTree{} - revisionMetadata := v1alpha1.RevisionMetadata{ - Author: "demo usert", - Date: metav1.Time{}, - Message: "some message", - } - - event, err := getResourceEventPayload(&app, &rs, &es, &actualState, &desiredState, &appTree, true, "", nil, &revisionMetadata, nil, common.LabelKeyAppInstance, argo.TrackingMethodLabel, &apiclient.ApplicationVersions{}) - assert.NoError(t, err) - - var eventPayload events.EventPayload - - err = json.Unmarshal(event.Payload, &eventPayload) - assert.NoError(t, err) - - assert.Equal(t, "{ \"key\" : \"manifest\" }", eventPayload.Source.DesiredManifest) - assert.Equal(t, "{ \"key\" : \"manifest\" }", eventPayload.Source.ActualManifest) - }) - - t.Run("Deleting timestamp is empty", func(t *testing.T) { - - app := v1alpha1.Application{ - ObjectMeta: metav1.ObjectMeta{ - DeletionTimestamp: &metav1.Time{}, - }, - Status: v1alpha1.ApplicationStatus{}, - } - rs := v1alpha1.ResourceStatus{} - es := events.EventSource{} - man := "{ \"key\" : \"manifest\" }" - actualState := application.ApplicationResourceResponse{ - Manifest: &man, - } - desiredState := apiclient.Manifest{ - CompiledManifest: "{ \"key\" : \"manifest\" }", - } - appTree := v1alpha1.ApplicationTree{} - revisionMetadata := v1alpha1.RevisionMetadata{ - Author: "demo usert", - Date: metav1.Time{}, - Message: "some message", - } - - event, err := getResourceEventPayload(&app, &rs, &es, &actualState, &desiredState, &appTree, true, "", nil, &revisionMetadata, nil, common.LabelKeyAppInstance, argo.TrackingMethodLabel, &apiclient.ApplicationVersions{}) - assert.NoError(t, err) - - var eventPayload events.EventPayload - - err = json.Unmarshal(event.Payload, &eventPayload) - assert.NoError(t, err) - - assert.Equal(t, "", eventPayload.Source.DesiredManifest) - assert.Equal(t, "", eventPayload.Source.ActualManifest) - }) -} - -func TestGetApplicationLatestRevision(t *testing.T) { - appRevision := "a-revision" - history1Revision := "history-revision-1" - history2Revision := "history-revision-2" - - t.Run("resource revision should be taken from sync.revision", func(t *testing.T) { - noStatusHistoryAppMock := v1alpha1.Application{ - Status: v1alpha1.ApplicationStatus{ - Sync: v1alpha1.SyncStatus{ - Revision: appRevision, - }, - }, - } - - revisionResult := getApplicationLatestRevision(&noStatusHistoryAppMock) - assert.Equal(t, revisionResult, appRevision) - - emptyStatusHistoryAppMock := v1alpha1.Application{ - Status: v1alpha1.ApplicationStatus{ - Sync: v1alpha1.SyncStatus{ - Revision: appRevision, - }, - History: []v1alpha1.RevisionHistory{}, - }, - } - - revision2Result := getApplicationLatestRevision(&emptyStatusHistoryAppMock) - assert.Equal(t, revision2Result, appRevision) - }) - - t.Run("resource revision should be taken from latest history.revision", func(t *testing.T) { - appMock := v1alpha1.Application{ - Status: v1alpha1.ApplicationStatus{ - Sync: v1alpha1.SyncStatus{ - Revision: appRevision, - }, - History: []v1alpha1.RevisionHistory{ - { - Revision: history1Revision, - }, - { - Revision: history2Revision, - }, - }, - }, - } - - revisionResult := getApplicationLatestRevision(&appMock) - assert.Equal(t, revisionResult, history2Revision) - }) -} - -func TestGetLatestAppHistoryId(t *testing.T) { - history1Id := int64(1) - history2Id := int64(2) - - t.Run("resource revision should be 0", func(t *testing.T) { - noStatusHistoryAppMock := v1alpha1.Application{} - - idResult := getLatestAppHistoryId(&noStatusHistoryAppMock) - assert.Equal(t, idResult, int64(0)) - - emptyStatusHistoryAppMock := v1alpha1.Application{ - Status: v1alpha1.ApplicationStatus{ - History: []v1alpha1.RevisionHistory{}, - }, - } - - id2Result := getLatestAppHistoryId(&emptyStatusHistoryAppMock) - assert.Equal(t, id2Result, int64(0)) - }) - - t.Run("resource revision should be taken from latest history.Id", func(t *testing.T) { - appMock := v1alpha1.Application{ - Status: v1alpha1.ApplicationStatus{ - History: []v1alpha1.RevisionHistory{ - { - ID: history1Id, - }, - { - ID: history2Id, - }, - }, - }, - } - - revisionResult := getLatestAppHistoryId(&appMock) - assert.Equal(t, revisionResult, history2Id) - }) -} - -func newAppLister(objects ...runtime.Object) applisters.ApplicationLister { - fakeAppsClientset := fakeapps.NewSimpleClientset(objects...) - factory := appinformer.NewSharedInformerFactoryWithOptions(fakeAppsClientset, 0, appinformer.WithNamespace(""), appinformer.WithTweakListOptions(func(options *metav1.ListOptions) {})) - appsInformer := factory.Argoproj().V1alpha1().Applications() - for _, obj := range objects { - switch obj.(type) { - case *appsv1.Application: - _ = appsInformer.Informer().GetStore().Add(obj) - } - } - appLister := appsInformer.Lister() - return appLister -} - -func fakeServer() *Server { - cm := test.NewFakeConfigMap() - secret := test.NewFakeSecret() - kubeclientset := fake.NewSimpleClientset(cm, secret) - appClientSet := apps.NewSimpleClientset() - - appInformer := appinformers.NewApplicationInformer(appClientSet, "", time.Minute, cache.Indexers{}) - - guestbookApp := &appsv1.Application{ - TypeMeta: metav1.TypeMeta{ - Kind: "Application", - APIVersion: "argoproj.io/v1alpha1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "guestbook", - Namespace: testNamespace, - }, - Spec: appsv1.ApplicationSpec{ - Project: "default", - Source: &appsv1.ApplicationSource{ - RepoURL: "https://test", - TargetRevision: "HEAD", - Helm: &appsv1.ApplicationSourceHelm{ - ValueFiles: []string{"values.yaml"}, - }, - }, - }, - Status: appsv1.ApplicationStatus{ - History: appsv1.RevisionHistories{ - { - Revision: "abcdef123567", - Source: appsv1.ApplicationSource{ - RepoURL: "https://test", - TargetRevision: "HEAD", - Helm: &appsv1.ApplicationSourceHelm{ - ValueFiles: []string{"values-old.yaml"}, - }, - }, - }, - }, - }, - } - - appLister := newAppLister(guestbookApp) - // _, _ := test.NewInMemoryRedis() - - cache := servercache.NewCache( - appstatecache.NewCache( - cacheutil.NewCache(cacheutil.NewInMemoryCache(1*time.Hour)), - 1*time.Minute, - ), - 1*time.Minute, - 1*time.Minute, - 1*time.Minute, - ) - - enf := rbac.NewEnforcer(kubeclientset, testNamespace, common.ArgoCDRBACConfigMapName, nil) - server, _ := NewServer(test.FakeArgoCDNamespace, kubeclientset, appClientSet, appLister, appInformer, nil, nil, cache, nil, nil, enf, nil, nil, nil, nil) - return server.(*Server) -} - -func TestShouldSendEvent(t *testing.T) { - serverInstance := fakeServer() - t.Run("should send because cache is missing", func(t *testing.T) { - eventReporter := applicationEventReporter{ - server: serverInstance, - } - - app := &v1alpha1.Application{} - rs := v1alpha1.ResourceStatus{} - - res := eventReporter.shouldSendResourceEvent(app, rs) - assert.True(t, res) - }) - - t.Run("should not send - same entities", func(t *testing.T) { - eventReporter := applicationEventReporter{ - server: serverInstance, - } - - app := &v1alpha1.Application{} - rs := v1alpha1.ResourceStatus{} - - _ = eventReporter.server.cache.SetLastResourceEvent(app, rs, time.Minute, "") - - res := eventReporter.shouldSendResourceEvent(app, rs) - assert.False(t, res) - }) - - t.Run("should send - different entities", func(t *testing.T) { - eventReporter := applicationEventReporter{ - server: serverInstance, - } - - app := &v1alpha1.Application{} - rs := v1alpha1.ResourceStatus{} - - _ = eventReporter.server.cache.SetLastResourceEvent(app, rs, time.Minute, "") - - rs.Status = v1alpha1.SyncStatusCodeOutOfSync - - res := eventReporter.shouldSendResourceEvent(app, rs) - assert.True(t, res) - }) - -} - -type MockEventing_StartEventSourceServer struct { - grpc.ServerStream -} - -var result func(*events.Event) error - -func (m *MockEventing_StartEventSourceServer) Send(event *events.Event) error { - return result(event) -} - -func TestStreamApplicationEvent(t *testing.T) { - serverInstance := fakeServer() - t.Run("root application", func(t *testing.T) { - eventReporter := applicationEventReporter{ - server: serverInstance, - } - - app := &v1alpha1.Application{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "argoproj.io/v1alpha1", - Kind: "Application", - }, - } - name := "name" - - result = func(event *events.Event) error { - var payload events.EventPayload - _ = json.Unmarshal(event.Payload, &payload) - - var actualApp v1alpha1.Application - _ = json.Unmarshal([]byte(payload.Source.ActualManifest), &actualApp) - assert.Equal(t, *app, actualApp) - return nil - } - - _ = eventReporter.streamApplicationEvents(context.Background(), log.New(), app, &events.EventSource{Name: &name}, &MockEventing_StartEventSourceServer{}, "", false, common.LabelKeyAppInstance, argo.TrackingMethodLabel) - }) - -} - -func TestGetResourceEventPayloadWithoutRevision(t *testing.T) { - app := v1alpha1.Application{} - rs := v1alpha1.ResourceStatus{} - es := events.EventSource{} - - mf := "{ \"key\" : \"manifest\" }" - - actualState := application.ApplicationResourceResponse{ - Manifest: &mf, - } - desiredState := apiclient.Manifest{ - CompiledManifest: "{ \"key\" : \"manifest\" }", - } - appTree := v1alpha1.ApplicationTree{} - - _, err := getResourceEventPayload(&app, &rs, &es, &actualState, &desiredState, &appTree, true, "", nil, nil, nil, common.LabelKeyAppInstance, argo.TrackingMethodLabel, &apiclient.ApplicationVersions{}) - assert.NoError(t, err) - -} - -func StrToUnstructured(jsonStr string) *unstructured.Unstructured { - obj := make(map[string]interface{}) - err := yaml.Unmarshal([]byte(jsonStr), &obj) - if err != nil { - panic(err) - } - return &unstructured.Unstructured{Object: obj} -} - -func TestAddCommitDetailsToLabels(t *testing.T) { - revisionMetadata := v1alpha1.RevisionMetadata{ - Author: "demo usert", - Date: metav1.Time{}, - Message: "some message", - } - - t.Run("set labels when lable object missing", func(t *testing.T) { - resource := StrToUnstructured(` - apiVersion: v1 - kind: Service - metadata: - name: helm-guestbook - namespace: default - resourceVersion: "123" - uid: "4" - spec: - selector: - app: guestbook - type: LoadBalancer - status: - loadBalancer: - ingress: - - hostname: localhost`, - ) - - result := addCommitDetailsToLabels(resource, &revisionMetadata) - labels := result.GetLabels() - assert.Equal(t, revisionMetadata.Author, labels["app.meta.commit-author"]) - assert.Equal(t, revisionMetadata.Message, labels["app.meta.commit-message"]) - }) - - t.Run("set labels when labels present", func(t *testing.T) { - resource := StrToUnstructured(` - apiVersion: v1 - kind: Service - metadata: - name: helm-guestbook - namespace: default - labels: - link: http://my-grafana.com/pre-generated-link - spec: - selector: - app: guestbook - type: LoadBalancer - status: - loadBalancer: - ingress: - - hostname: localhost`, - ) - - result := addCommitDetailsToLabels(resource, &revisionMetadata) - labels := result.GetLabels() - assert.Equal(t, revisionMetadata.Author, labels["app.meta.commit-author"]) - assert.Equal(t, revisionMetadata.Message, labels["app.meta.commit-message"]) - assert.Equal(t, "http://my-grafana.com/pre-generated-link", result.GetLabels()["link"]) - }) -} diff --git a/server/application/application_validate_src_and_dest.go b/server/application/application_validate_src_and_dest.go new file mode 100644 index 0000000000000..0aa12a1fe6bdf --- /dev/null +++ b/server/application/application_validate_src_and_dest.go @@ -0,0 +1,90 @@ +package application + +import ( + "context" + "fmt" + "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" + appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" + "github.com/argoproj/argo-cd/v2/util/argo" + "github.com/argoproj/argo-cd/v2/util/db" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + apierr "k8s.io/apimachinery/pkg/api/errors" +) + +func (s *Server) ValidateSrcAndDst(ctx context.Context, requset *application.ApplicationValidationRequest) (*application.ApplicationValidateResponse, error) { + app := requset.Application + proj, err := argo.GetAppProject(app, applisters.NewAppProjectLister(s.projInformer.GetIndexer()), s.ns, s.settingsMgr, s.db, ctx) + if err != nil { + entity := projectEntity + if apierr.IsNotFound(err) { + errMsg := fmt.Sprintf("application references project %s which does not exist", app.Spec.Project) + return &application.ApplicationValidateResponse{ + Error: &errMsg, + Entity: &entity, + }, nil + } + errMsg := err.Error() + return &application.ApplicationValidateResponse{ + Error: &errMsg, + Entity: &entity, + }, nil + } + + if err := validateDestination(ctx, &app.Spec.Destination, s.db); err != nil { + entity := destinationEntity + errMsg := fmt.Sprintf("application destination spec for %s is invalid: %s", app.ObjectMeta.Name, err.Error()) + return &application.ApplicationValidateResponse{ + Error: &errMsg, + Entity: &entity, + }, nil + } + var conditions []appv1.ApplicationCondition + conditions, err = argo.ValidateRepo(ctx, app, s.repoClientset, s.db, s.kubectl, proj, s.settingsMgr) + if err != nil { + entity := sourceEntity + errMsg := err.Error() + return &application.ApplicationValidateResponse{ + Error: &errMsg, + Entity: &entity, + }, nil + } + if len(conditions) > 0 { + entity := sourceEntity + errMsg := fmt.Sprintf("application spec for %s is invalid: %s", app.ObjectMeta.Name, argo.FormatAppConditions(conditions)) + return &application.ApplicationValidateResponse{ + Error: &errMsg, + Entity: &entity, + }, nil + } + return &application.ApplicationValidateResponse{ + Error: nil, + Entity: nil, + }, nil +} + +// validates destination name (argo.ValidateDestination) and server with extra logic +func validateDestination(ctx context.Context, dest *appv1.ApplicationDestination, db db.ArgoDB) error { + err := argo.ValidateDestination(ctx, dest, db) + + if err != nil { + return err + } + + if dest.Server != "" { + // Ensure the k8s cluster the app is referencing, is configured in Argo CD + _, err := db.GetCluster(ctx, dest.Server) + if err != nil { + if errStatus, ok := status.FromError(err); ok && errStatus.Code() == codes.NotFound { + return fmt.Errorf("cluster '%s' has not been configured", dest.Server) + } else { + return err + } + } + } else if dest.Server == "" { + return fmt.Errorf("destination server missing from app spec") + } + + return nil +} diff --git a/server/application/events.proto b/server/application/events.proto index bcd8bf16a265d..c946bf6214bc9 100644 --- a/server/application/events.proto +++ b/server/application/events.proto @@ -12,13 +12,6 @@ import "k8s.io/apimachinery/pkg/apis/meta/v1/generated.proto"; import "gogoproto/gogo.proto"; import "google/api/annotations.proto"; -// Eventing service -service Eventing { - rpc StartEventSource(EventSource) returns (stream Event) { - option (google.api.http).get = "/api/v1/stream/events"; - } -} - message EventSource { // The event source name. required string name = 1; diff --git a/server/server.go b/server/server.go index b8f0fc1a76dee..9ca4492493289 100644 --- a/server/server.go +++ b/server/server.go @@ -68,7 +68,6 @@ import ( applicationsetpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/applicationset" certificatepkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/certificate" clusterpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/cluster" - eventspkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/events" gpgkeypkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/gpgkey" projectpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/project" repocredspkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/repocreds" @@ -800,8 +799,6 @@ func (a *ArgoCDServer) newGRPCServer() (*grpc.Server, application.AppResourceTre ))) grpcS := grpc.NewServer(sOpts...) - srv := a.serviceSet.ApplicationService.(*application.Server) - eventspkg.RegisterEventingServer(grpcS, srv) versionpkg.RegisterVersionServiceServer(grpcS, a.serviceSet.VersionService) clusterpkg.RegisterClusterServiceServer(grpcS, a.serviceSet.ClusterService) applicationpkg.RegisterApplicationServiceServer(grpcS, a.serviceSet.ApplicationService) diff --git a/util/settings/settings.go b/util/settings/settings.go index 316f78fc4af2c..7eb16e75f8f10 100644 --- a/util/settings/settings.go +++ b/util/settings/settings.go @@ -218,14 +218,6 @@ type KustomizeSettings struct { Versions []KustomizeVersion } -// CodefreshReporterVersion includes all cf reporter versions -type CodefreshReporterVersion string - -const ( - CodefreshV1ReporterVersion CodefreshReporterVersion = "v1" - CodefreshV2ReporterVersion CodefreshReporterVersion = "v2" -) - var ( ByClusterURLIndexer = "byClusterURL" byClusterURLIndexerFunc = func(obj interface{}) ([]string, error) { @@ -446,8 +438,6 @@ const ( settingsWebhookMaxPayloadSizeMB = "webhook.maxPayloadSizeMB" // settingsApplicationInstanceLabelKey is the key to configure injected app instance label key settingsApplicationInstanceLabelKey = "application.instanceLabelKey" - // settingsCodefreshReporterVersion is the key to configure injected app instance label key - settingsCodefreshReporterVersion = "codefresh.reporterVersion" // settingsResourceTrackingMethodKey is the key to configure tracking method for application resources settingsResourceTrackingMethodKey = "application.resourceTrackingMethod" // resourcesCustomizationsKey is the key to the map of resource overrides @@ -781,18 +771,6 @@ func (mgr *SettingsManager) GetAppInstanceLabelKey() (string, error) { return label, nil } -func (mgr *SettingsManager) GetCodefreshReporterVersion() (string, error) { - argoCDCM, err := mgr.getConfigMap() - if err != nil { - return "", err - } - label := argoCDCM.Data[settingsCodefreshReporterVersion] - if label == "" { - return string(CodefreshV1ReporterVersion), nil - } - return label, nil -} - func (mgr *SettingsManager) GetKustomizeSetNamespaceEnabled() bool { argoCDCM, err := mgr.getConfigMap() if err != nil {