Skip to content

Commit

Permalink
v2 reporter: changes after pr review
Browse files Browse the repository at this point in the history
  • Loading branch information
oleksandr-codefresh committed Jul 3, 2024
1 parent 4019658 commit e614d06
Show file tree
Hide file tree
Showing 13 changed files with 503 additions and 465 deletions.
17 changes: 17 additions & 0 deletions event_reporter/reporter/app_revision.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package reporter

import (
"context"
"github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
)

func (s *applicationEventReporter) getApplicationRevisionDetails(ctx context.Context, a *appv1.Application, revision string) (*appv1.RevisionMetadata, error) {
project := a.Spec.GetProject()
return s.applicationServiceClient.RevisionMetadata(ctx, &application.RevisionMetadataQuery{
Name: &a.Name,
AppNamespace: &a.Namespace,
Revision: &revision,
Project: &project,
})
}
84 changes: 0 additions & 84 deletions event_reporter/reporter/app_revision_helpers.go

This file was deleted.

43 changes: 22 additions & 21 deletions event_reporter/reporter/application_event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/argoproj/argo-cd/v2/event_reporter/utils"
"math"
"reflect"
"strings"
Expand Down Expand Up @@ -63,13 +64,13 @@ func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceCli
}

func (s *applicationEventReporter) shouldSendResourceEvent(a *appv1.Application, rs appv1.ResourceStatus) bool {
logCtx := logWithResourceStatus(log.WithFields(log.Fields{
logCtx := utils.LogWithResourceStatus(log.WithFields(log.Fields{
"app": a.Name,
"gvk": fmt.Sprintf("%s/%s/%s", rs.Group, rs.Version, rs.Kind),
"resource": fmt.Sprintf("%s/%s", rs.Namespace, rs.Name),
}), rs)

cachedRes, err := s.cache.GetLastResourceEvent(a, rs, getApplicationLatestRevision(a))
cachedRes, err := s.cache.GetLastResourceEvent(a, rs, utils.GetApplicationLatestRevision(a))
if err != nil {
logCtx.Debug("resource not in cache")
return true
Expand Down Expand Up @@ -144,19 +145,19 @@ func (s *applicationEventReporter) StreamApplicationEvents(

logCtx.Info("getting parent application name")

parentAppIdentity := getParentAppIdentity(a, appInstanceLabelKey, trackingMethod)
parentAppIdentity := utils.GetParentAppIdentity(a, appInstanceLabelKey, trackingMethod)

if isChildApp(parentAppIdentity) {
if utils.IsChildApp(parentAppIdentity) {
logCtx.Info("processing as child application")
parentApplicationEntity, err := s.applicationServiceClient.Get(ctx, &application.ApplicationQuery{
Name: &parentAppIdentity.name,
AppNamespace: &parentAppIdentity.namespace,
Name: &parentAppIdentity.Name,
AppNamespace: &parentAppIdentity.Namespace,
})
if err != nil {
return fmt.Errorf("failed to get parent application entity: %w", err)
}

rs := getAppAsResource(a)
rs := utils.GetAppAsResource(a)

parentDesiredManifests, err, manifestGenErr := s.getDesiredManifests(ctx, parentApplicationEntity, logCtx)
if err != nil {
Expand All @@ -165,13 +166,13 @@ func (s *applicationEventReporter) StreamApplicationEvents(

// helm app hasnt revision
// TODO: add check if it helm application
parentOperationRevision := getOperationRevision(parentApplicationEntity)
parentOperationRevision := utils.GetOperationRevision(parentApplicationEntity)
parentRevisionMetadata, err := s.getApplicationRevisionDetails(ctx, parentApplicationEntity, parentOperationRevision)
if err != nil {
logCtx.WithError(err).Warn("failed to get parent application's revision metadata, resuming")
}

setHealthStatusIfMissing(rs)
utils.SetHealthStatusIfMissing(rs)
err = s.processResource(ctx, *rs, parentApplicationEntity, logCtx, ts, parentDesiredManifests, appTree, manifestGenErr, a, parentRevisionMetadata, appInstanceLabelKey, trackingMethod, desiredManifests.ApplicationVersions)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricChildAppEventType, metrics.MetricEventUnknownErrorType, a.Name)
Expand All @@ -193,7 +194,7 @@ func (s *applicationEventReporter) StreamApplicationEvents(
return nil
}

logWithAppStatus(a, logCtx, ts).Info("sending root application event")
utils.LogWithAppStatus(a, logCtx, ts).Info("sending root application event")
if err := s.codefreshClient.SendEvent(ctx, a.Name, appEvent); err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricParentAppEventType, metrics.MetricEventDeliveryErrorType, a.Name)
return fmt.Errorf("failed to send event for root application %s/%s: %w", a.Namespace, a.Name, err)
Expand All @@ -202,14 +203,14 @@ func (s *applicationEventReporter) StreamApplicationEvents(
s.metricsServer.ObserveEventProcessingDurationHistogramDuration(a.Name, metrics.MetricParentAppEventType, reconcileDuration)
}

revisionMetadata, _ := s.getApplicationRevisionDetails(ctx, a, getOperationRevision(a))
revisionMetadata, _ := s.getApplicationRevisionDetails(ctx, a, utils.GetOperationRevision(a))
// for each resource in the application get desired and actual state,
// then stream the event
for _, rs := range a.Status.Resources {
if isApp(rs) {
if utils.IsApp(rs) {
continue
}
setHealthStatusIfMissing(&rs)
utils.SetHealthStatusIfMissing(&rs)
if !ignoreResourceCache && !s.shouldSendResourceEvent(a, rs) {
s.metricsServer.IncCachedIgnoredEventsCounter(metrics.MetricResourceEventType, a.Name)
continue
Expand Down Expand Up @@ -239,7 +240,7 @@ func (s *applicationEventReporter) getAppForResourceReporting(
return a, revisionMetadata
}

revisionMetadataToReport, err := s.getApplicationRevisionDetails(ctx, latestAppStatus, getOperationRevision(latestAppStatus))
revisionMetadataToReport, err := s.getApplicationRevisionDetails(ctx, latestAppStatus, utils.GetOperationRevision(latestAppStatus))

if err != nil {
return a, revisionMetadata
Expand All @@ -264,7 +265,7 @@ func (s *applicationEventReporter) processResource(
applicationVersions *apiclient.ApplicationVersions,
) error {
metricsEventType := metrics.MetricResourceEventType
if isApp(rs) {
if utils.IsApp(rs) {
metricsEventType = metrics.MetricChildAppEventType
}

Expand All @@ -290,7 +291,7 @@ func (s *applicationEventReporter) processResource(
var originalAppRevisionMetadata *appv1.RevisionMetadata = nil

if originalApplication != nil {
originalAppRevisionMetadata, _ = s.getApplicationRevisionDetails(ctx, originalApplication, getOperationRevision(originalApplication))
originalAppRevisionMetadata, _ = s.getApplicationRevisionDetails(ctx, originalApplication, utils.GetOperationRevision(originalApplication))
}

ev, err := getResourceEventPayload(parentApplicationToReport, &rs, actualState, desiredState, appTree, manifestGenErr, ts, originalApplication, revisionMetadataToReport, originalAppRevisionMetadata, appInstanceLabelKey, trackingMethod, applicationVersions)
Expand All @@ -302,11 +303,11 @@ func (s *applicationEventReporter) processResource(

appRes := appv1.Application{}
appName := ""
if isApp(rs) && actualState.Manifest != nil && json.Unmarshal([]byte(*actualState.Manifest), &appRes) == nil {
logWithAppStatus(&appRes, logCtx, ts).Info("streaming resource event")
if utils.IsApp(rs) && actualState.Manifest != nil && json.Unmarshal([]byte(*actualState.Manifest), &appRes) == nil {
utils.LogWithAppStatus(&appRes, logCtx, ts).Info("streaming resource event")
appName = appRes.Name
} else {
logWithResourceStatus(logCtx, rs).Info("streaming resource event")
utils.LogWithResourceStatus(logCtx, rs).Info("streaming resource event")
appName = rs.Name
}

Expand All @@ -320,15 +321,15 @@ func (s *applicationEventReporter) processResource(
return nil
}

if err := s.cache.SetLastResourceEvent(parentApplicationToReport, rs, resourceEventCacheExpiration, getApplicationLatestRevision(parentApplicationToReport)); err != nil {
if err := s.cache.SetLastResourceEvent(parentApplicationToReport, rs, resourceEventCacheExpiration, utils.GetApplicationLatestRevision(parentApplicationToReport)); err != nil {
logCtx.WithError(err).Warn("failed to cache resource event")
}

return nil
}

func (s *applicationEventReporter) getResourceActualState(ctx context.Context, logCtx *log.Entry, metricsEventType metrics.MetricEventType, rs appv1.ResourceStatus, parentApplication *appv1.Application, childApplication *appv1.Application) (*application.ApplicationResourceResponse, error) {
if isApp(rs) {
if utils.IsApp(rs) {
if childApplication.IsEmptyTypeMeta() {
// make sure there is type meta on object
childApplication.SetDefaultTypeMeta()
Expand Down
Loading

0 comments on commit e614d06

Please sign in to comment.