diff --git a/go/tasks/pluginmachinery/internal/webapi/cache.go b/go/tasks/pluginmachinery/internal/webapi/cache.go index 856e2104b..f3d1df1bf 100644 --- a/go/tasks/pluginmachinery/internal/webapi/cache.go +++ b/go/tasks/pluginmachinery/internal/webapi/cache.go @@ -78,31 +78,38 @@ func (q *ResourceCache) SyncResource(ctx context.Context, batch cache.Batch) ( logger.Debugf(ctx, "Sync loop - processing resource with cache key [%s]", resource.GetID()) - if cacheItem.SyncFailureCount > q.cfg.MaxSystemFailures { - logger.Infof(ctx, "Sync loop - Item with key [%v] has failed to sync [%v] time(s). More than the allowed [%v] time(s). Marking as failure.", - cacheItem.SyncFailureCount, q.cfg.MaxSystemFailures) - cacheItem.State.Phase = PhaseSystemFailure - } - if cacheItem.State.Phase.IsTerminal() { logger.Debugf(ctx, "Sync loop - resource cache key [%v] in terminal state [%s]", resource.GetID()) - resp = append(resp, cache.ItemSyncResponse{ ID: resource.GetID(), - Item: resource.GetItem(), + Item: cacheItem, Action: cache.Unchanged, }) continue } + if cacheItem.SyncFailureCount > q.cfg.MaxSystemFailures { + logger.Debugf(ctx, "Sync loop - Item with key [%v] has failed to sync [%v] time(s). More than the allowed [%v] time(s). Marking as failure.", + cacheItem.SyncFailureCount, q.cfg.MaxSystemFailures) + cacheItem.State.Phase = PhaseSystemFailure + resp = append(resp, cache.ItemSyncResponse{ + ID: resource.GetID(), + Item: cacheItem, + Action: cache.Update, + }) + + continue + } + // Get an updated status logger.Debugf(ctx, "Querying AsyncPlugin for %s", resource.GetID()) newResource, err := q.client.Get(ctx, newPluginContext(cacheItem.ResourceMeta, cacheItem.Resource, "", nil)) if err != nil { logger.Infof(ctx, "Error retrieving resource [%s]. Error: %v", resource.GetID(), err) cacheItem.SyncFailureCount++ + cacheItem.ErrorMessage = err.Error() // Make sure we don't return nil for the first argument, because that deletes it from the cache. resp = append(resp, cache.ItemSyncResponse{ diff --git a/go/tasks/pluginmachinery/internal/webapi/cache_test.go b/go/tasks/pluginmachinery/internal/webapi/cache_test.go index 49506b530..7d664baa7 100644 --- a/go/tasks/pluginmachinery/internal/webapi/cache_test.go +++ b/go/tasks/pluginmachinery/internal/webapi/cache_test.go @@ -67,6 +67,36 @@ func TestResourceCache_SyncResource(t *testing.T) { assert.Equal(t, cacheItem, newCacheItem[0].Item) }) + t.Run("Retry limit exceeded", func(t *testing.T) { + mockCache := &cacheMocks.AutoRefresh{} + mockClient := &mocks.Client{} + + q := ResourceCache{ + AutoRefresh: mockCache, + client: mockClient, + cfg: webapi.CachingConfig{ + MaxSystemFailures: 2, + }, + } + + cacheItem := CacheItem{ + State: State{ + SyncFailureCount: 5, + ErrorMessage: "some error", + }, + } + + iw := &cacheMocks.ItemWrapper{} + iw.OnGetItem().Return(cacheItem) + iw.OnGetID().Return("some-id") + + newCacheItem, err := q.SyncResource(ctx, []cache.ItemWrapper{iw}) + assert.NoError(t, err) + assert.Equal(t, cache.Update, newCacheItem[0].Action) + cacheItem.State.Phase = PhaseSystemFailure + assert.Equal(t, cacheItem, newCacheItem[0].Item) + }) + t.Run("move to success", func(t *testing.T) { mockCache := &cacheMocks.AutoRefresh{} mockClient := &mocks.Client{} diff --git a/go/tasks/pluginmachinery/internal/webapi/monitor.go b/go/tasks/pluginmachinery/internal/webapi/monitor.go index 6d3d4ac33..2dd03c11e 100644 --- a/go/tasks/pluginmachinery/internal/webapi/monitor.go +++ b/go/tasks/pluginmachinery/internal/webapi/monitor.go @@ -29,8 +29,16 @@ func monitor(ctx context.Context, tCtx core.TaskExecutionContext, p Client, cach errors.CacheFailed, "Failed to cast [%v]", cacheItem) } - // If the cache has not syncd yet, just return + // If the cache has not synced yet, just return if cacheItem.Resource == nil { + if cacheItem.Phase.IsTerminal() { + err = cache.DeleteDelayed(cacheItemID) + if err != nil { + logger.Errorf(ctx, "Failed to queue item for deletion in the cache with Item Id: [%v]. Error: %v", + cacheItemID, err) + } + return state, core.PhaseInfoFailure(errors.CacheFailed, cacheItem.ErrorMessage, nil), nil + } return state, core.PhaseInfoRunning(0, nil), nil } @@ -54,7 +62,7 @@ func monitor(ctx context.Context, tCtx core.TaskExecutionContext, p Client, cach // Queue item for deletion in the cache. err = cache.DeleteDelayed(cacheItemID) if err != nil { - logger.Warnf(ctx, "Failed to queue item for deletion in the cache with Item Id: [%v]. Error: %v", + logger.Errorf(ctx, "Failed to queue item for deletion in the cache with Item Id: [%v]. Error: %v", cacheItemID, err) } } diff --git a/go/tasks/pluginmachinery/internal/webapi/state.go b/go/tasks/pluginmachinery/internal/webapi/state.go index 130c1affc..efce83b30 100644 --- a/go/tasks/pluginmachinery/internal/webapi/state.go +++ b/go/tasks/pluginmachinery/internal/webapi/state.go @@ -56,4 +56,7 @@ type State struct { // The time the execution first requests for an allocation token AllocationTokenRequestStartTime time.Time `json:"allocationTokenRequestStartTime,omitempty"` + + // ErrorMessage generated during cache synchronization. + ErrorMessage string `json:"error_message,omitempty"` }