Skip to content

Commit

Permalink
fixing events (#125)
Browse files Browse the repository at this point in the history
* fixing events

* bump version
  • Loading branch information
roi-codefresh authored Jul 25, 2022
1 parent 5787550 commit ceac379
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 17 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.3.4-cap-CR-12256-fix-res-cache-key
2.3.4-cap-CR-13317-fixing-events
12 changes: 8 additions & 4 deletions server/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing

// sendIfPermitted is a helper to send the application to the client's streaming channel if the
// caller has RBAC privileges permissions to view it
sendIfPermitted := func(a appv1.Application, eventType watch.EventType, ts string) {
sendIfPermitted := func(a appv1.Application, eventType watch.EventType, ts string, ignoreResourceCache bool) {
if eventType == watch.Bookmark {
return // ignore this event
}
Expand All @@ -896,7 +896,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing
return
}

err := s.applicationEventReporter.streamApplicationEvents(stream.Context(), &a, es, stream, ts)
err := s.applicationEventReporter.streamApplicationEvents(stream.Context(), &a, es, stream, ts, ignoreResourceCache)
if err != nil {
logCtx.WithError(err).Error("failed to stream application events")
return
Expand All @@ -910,13 +910,17 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing

events := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize)

unsubscribe := s.appBroadcaster.Subscribe(events, s.applicationEventReporter.shouldSendApplicationEvent)
unsubscribe := s.appBroadcaster.Subscribe(events)
defer unsubscribe()
for {
select {
case event := <-events:
shouldProcess, ignoreResourceCache := s.applicationEventReporter.shouldSendApplicationEvent(event)
if !shouldProcess {
continue
}
ts := time.Now().Format("2006-01-02T15:04:05.000Z")
sendIfPermitted(event.Application, event.Type, ts)
sendIfPermitted(event.Application, event.Type, ts, ignoreResourceCache)
case <-stream.Context().Done():
return nil
}
Expand Down
38 changes: 27 additions & 11 deletions server/application/application_event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,13 @@ func (s *applicationEventReporter) streamApplicationEvents(
es *events.EventSource,
stream events.Eventing_StartEventSourceServer,
ts string,
ignoreResourceCache bool,
) error {
var (
logCtx = log.WithField("app", a.Name)
)

logCtx.Info("streaming application events")
logCtx.WithField("ignoreResourceCache", ignoreResourceCache).Info("streaming application events")

appTree, err := s.server.getAppResources(ctx, a)
if err != nil {
Expand All @@ -133,7 +134,7 @@ func (s *applicationEventReporter) streamApplicationEvents(

revisionMetadata, _ := s.getApplicationHistoryRevisionDetails(ctx, a)

s.processResource(ctx, *rs, parentApplicationEntity, logCtx, ts, desiredManifests, stream, appTree, es, manifestGenErr, a, revisionMetadata)
s.processResource(ctx, *rs, parentApplicationEntity, logCtx, ts, desiredManifests, stream, appTree, es, manifestGenErr, a, revisionMetadata, false)
} else {
// application events for child apps would be sent by its parent app
// as resource event
Expand Down Expand Up @@ -165,7 +166,7 @@ func (s *applicationEventReporter) streamApplicationEvents(
if isApp(rs) {
continue
}
s.processResource(ctx, rs, a, logCtx, ts, desiredManifests, stream, appTree, es, manifestGenErr, nil, revisionMetadata)
s.processResource(ctx, rs, a, logCtx, ts, desiredManifests, stream, appTree, es, manifestGenErr, nil, revisionMetadata, ignoreResourceCache)
}
return nil
}
Expand All @@ -183,13 +184,23 @@ func (s *applicationEventReporter) processResource(
manifestGenErr bool,
originalApplication *appv1.Application,
revisionMetadata *appv1.RevisionMetadata,
ignoreResourceCache bool,
) {
logCtx = logCtx.WithFields(log.Fields{
"gvk": fmt.Sprintf("%s/%s/%s", rs.Group, rs.Version, rs.Kind),
"resource": fmt.Sprintf("%s/%s", rs.Namespace, rs.Name),
})

if !s.shouldSendResourceEvent(parentApplication, rs) {
if rs.Health == nil && rs.Status == appv1.SyncStatusCodeSynced {
// for resources without health status we need to add 'Healthy' status
// when they are synced because we might have sent an event with 'Missing'
// status earlier and they would be stuck in it if we don't switch to 'Healthy'
rs.Health = &appv1.HealthStatus{
Status: health.HealthStatusHealthy,
}
}

if !ignoreResourceCache && !s.shouldSendResourceEvent(parentApplication, rs) {
return
}

Expand Down Expand Up @@ -238,17 +249,17 @@ func (s *applicationEventReporter) processResource(
}
}

func (s *applicationEventReporter) shouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) bool {
func (s *applicationEventReporter) shouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool) {
logCtx := log.WithField("app", ae.Application.Name)

if ae.Type == watch.Deleted {
logCtx.Info("application deleted")
return true
return true, false
}

cachedApp, err := s.server.cache.GetLastApplicationEvent(&ae.Application)
if err != nil || cachedApp == nil {
return true
return true, false
}

cachedApp.Status.ReconciledAt = ae.Application.Status.ReconciledAt // ignore those in the diff
Expand All @@ -260,22 +271,27 @@ func (s *applicationEventReporter) shouldSendApplicationEvent(ae *appv1.Applicat
ae.Application.Status.Conditions[i].LastTransitionTime = nil
}

// check if application changed to healthy status
if ae.Application.Status.Health.Status == health.HealthStatusHealthy && cachedApp.Status.Health.Status != health.HealthStatusHealthy {
return true, true
}

if !reflect.DeepEqual(ae.Application.Spec, cachedApp.Spec) {
logCtx.Info("application spec changed")
return true
return true, false
}

if !reflect.DeepEqual(ae.Application.Status, cachedApp.Status) {
logCtx.Info("application status changed")
return true
return true, false
}

if !reflect.DeepEqual(ae.Application.Operation, cachedApp.Operation) {
logCtx.Info("application operation changed")
return true
return true, false
}

return false
return false, false
}

func isApp(rs appv1.ResourceStatus) bool {
Expand Down
2 changes: 1 addition & 1 deletion server/application/application_event_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func TestStreamApplicationEvent(t *testing.T) {
return nil
}

_ = eventReporter.streamApplicationEvents(context.Background(), app, &events.EventSource{Name: &name}, &MockEventing_StartEventSourceServer{}, "")
_ = eventReporter.streamApplicationEvents(context.Background(), app, &events.EventSource{Name: &name}, &MockEventing_StartEventSourceServer{}, "", false)
})

}
Expand Down

0 comments on commit ceac379

Please sign in to comment.