From ceac379677af05de810431f6c9b098b1b1a30b08 Mon Sep 17 00:00:00 2001 From: roi-codefresh Date: Mon, 25 Jul 2022 11:21:07 +0300 Subject: [PATCH] fixing events (#125) * fixing events * bump version --- VERSION | 2 +- server/application/application.go | 12 ++++-- .../application/application_event_reporter.go | 38 +++++++++++++------ .../application_event_reporter_test.go | 2 +- 4 files changed, 37 insertions(+), 17 deletions(-) diff --git a/VERSION b/VERSION index 99429eb5da7a3..3a8f4912f563f 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.3.4-cap-CR-12256-fix-res-cache-key +2.3.4-cap-CR-13317-fixing-events diff --git a/server/application/application.go b/server/application/application.go index 26d03c07e2e3a..bd0951337954f 100644 --- a/server/application/application.go +++ b/server/application/application.go @@ -877,7 +877,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing // sendIfPermitted is a helper to send the application to the client's streaming channel if the // caller has RBAC privileges permissions to view it - sendIfPermitted := func(a appv1.Application, eventType watch.EventType, ts string) { + sendIfPermitted := func(a appv1.Application, eventType watch.EventType, ts string, ignoreResourceCache bool) { if eventType == watch.Bookmark { return // ignore this event } @@ -896,7 +896,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing return } - err := s.applicationEventReporter.streamApplicationEvents(stream.Context(), &a, es, stream, ts) + err := s.applicationEventReporter.streamApplicationEvents(stream.Context(), &a, es, stream, ts, ignoreResourceCache) if err != nil { logCtx.WithError(err).Error("failed to stream application events") return @@ -910,13 +910,17 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing events := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize) - unsubscribe := s.appBroadcaster.Subscribe(events, s.applicationEventReporter.shouldSendApplicationEvent) + unsubscribe := s.appBroadcaster.Subscribe(events) defer unsubscribe() for { select { case event := <-events: + shouldProcess, ignoreResourceCache := s.applicationEventReporter.shouldSendApplicationEvent(event) + if !shouldProcess { + continue + } ts := time.Now().Format("2006-01-02T15:04:05.000Z") - sendIfPermitted(event.Application, event.Type, ts) + sendIfPermitted(event.Application, event.Type, ts, ignoreResourceCache) case <-stream.Context().Done(): return nil } diff --git a/server/application/application_event_reporter.go b/server/application/application_event_reporter.go index 2be25b57dd795..d036ebbb1a5fa 100644 --- a/server/application/application_event_reporter.go +++ b/server/application/application_event_reporter.go @@ -103,12 +103,13 @@ func (s *applicationEventReporter) streamApplicationEvents( es *events.EventSource, stream events.Eventing_StartEventSourceServer, ts string, + ignoreResourceCache bool, ) error { var ( logCtx = log.WithField("app", a.Name) ) - logCtx.Info("streaming application events") + logCtx.WithField("ignoreResourceCache", ignoreResourceCache).Info("streaming application events") appTree, err := s.server.getAppResources(ctx, a) if err != nil { @@ -133,7 +134,7 @@ func (s *applicationEventReporter) streamApplicationEvents( revisionMetadata, _ := s.getApplicationHistoryRevisionDetails(ctx, a) - s.processResource(ctx, *rs, parentApplicationEntity, logCtx, ts, desiredManifests, stream, appTree, es, manifestGenErr, a, revisionMetadata) + s.processResource(ctx, *rs, parentApplicationEntity, logCtx, ts, desiredManifests, stream, appTree, es, manifestGenErr, a, revisionMetadata, false) } else { // application events for child apps would be sent by its parent app // as resource event @@ -165,7 +166,7 @@ func (s *applicationEventReporter) streamApplicationEvents( if isApp(rs) { continue } - s.processResource(ctx, rs, a, logCtx, ts, desiredManifests, stream, appTree, es, manifestGenErr, nil, revisionMetadata) + s.processResource(ctx, rs, a, logCtx, ts, desiredManifests, stream, appTree, es, manifestGenErr, nil, revisionMetadata, ignoreResourceCache) } return nil } @@ -183,13 +184,23 @@ func (s *applicationEventReporter) processResource( manifestGenErr bool, originalApplication *appv1.Application, revisionMetadata *appv1.RevisionMetadata, + ignoreResourceCache bool, ) { logCtx = logCtx.WithFields(log.Fields{ "gvk": fmt.Sprintf("%s/%s/%s", rs.Group, rs.Version, rs.Kind), "resource": fmt.Sprintf("%s/%s", rs.Namespace, rs.Name), }) - if !s.shouldSendResourceEvent(parentApplication, rs) { + if rs.Health == nil && rs.Status == appv1.SyncStatusCodeSynced { + // for resources without health status we need to add 'Healthy' status + // when they are synced because we might have sent an event with 'Missing' + // status earlier and they would be stuck in it if we don't switch to 'Healthy' + rs.Health = &appv1.HealthStatus{ + Status: health.HealthStatusHealthy, + } + } + + if !ignoreResourceCache && !s.shouldSendResourceEvent(parentApplication, rs) { return } @@ -238,17 +249,17 @@ func (s *applicationEventReporter) processResource( } } -func (s *applicationEventReporter) shouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) bool { +func (s *applicationEventReporter) shouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool) { logCtx := log.WithField("app", ae.Application.Name) if ae.Type == watch.Deleted { logCtx.Info("application deleted") - return true + return true, false } cachedApp, err := s.server.cache.GetLastApplicationEvent(&ae.Application) if err != nil || cachedApp == nil { - return true + return true, false } cachedApp.Status.ReconciledAt = ae.Application.Status.ReconciledAt // ignore those in the diff @@ -260,22 +271,27 @@ func (s *applicationEventReporter) shouldSendApplicationEvent(ae *appv1.Applicat ae.Application.Status.Conditions[i].LastTransitionTime = nil } + // check if application changed to healthy status + if ae.Application.Status.Health.Status == health.HealthStatusHealthy && cachedApp.Status.Health.Status != health.HealthStatusHealthy { + return true, true + } + if !reflect.DeepEqual(ae.Application.Spec, cachedApp.Spec) { logCtx.Info("application spec changed") - return true + return true, false } if !reflect.DeepEqual(ae.Application.Status, cachedApp.Status) { logCtx.Info("application status changed") - return true + return true, false } if !reflect.DeepEqual(ae.Application.Operation, cachedApp.Operation) { logCtx.Info("application operation changed") - return true + return true, false } - return false + return false, false } func isApp(rs appv1.ResourceStatus) bool { diff --git a/server/application/application_event_reporter_test.go b/server/application/application_event_reporter_test.go index 1fd793248f021..53012ec48811c 100644 --- a/server/application/application_event_reporter_test.go +++ b/server/application/application_event_reporter_test.go @@ -346,7 +346,7 @@ func TestStreamApplicationEvent(t *testing.T) { return nil } - _ = eventReporter.streamApplicationEvents(context.Background(), app, &events.EventSource{Name: &name}, &MockEventing_StartEventSourceServer{}, "") + _ = eventReporter.streamApplicationEvents(context.Background(), app, &events.EventSource{Name: &name}, &MockEventing_StartEventSourceServer{}, "", false) }) }