diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index d4bed3e..1d1a1f0 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -56,7 +56,9 @@ type metrics struct { scope promutils.Scope } -type Item interface{} +type Item interface { + IsTerminal() bool +} // Items are wrapped inside an ItemWrapper to be stored in the cache. type ItemWrapper interface { @@ -164,7 +166,7 @@ func (w *autoRefresh) Start(ctx context.Context) error { go wait.Until(func() { err := w.enqueueBatches(enqueueCtx) if err != nil { - logger.Errorf(enqueueCtx, "Failed to sync. Error: %v", err) + logger.Errorf(enqueueCtx, "Failed to enqueue. Error: %v", err) } }, w.syncPeriod, enqueueCtx.Done()) @@ -209,13 +211,20 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { snapshot := make([]ItemWrapper, 0, len(keys)) for _, k := range keys { + if w.toDelete.Contains(k) { + w.lruMap.Remove(k) + w.toDelete.Remove(k) + continue + } // If not ok, it means evicted between the item was evicted between getting the keys and this update loop // which is fine, we can just ignore. - if value, ok := w.lruMap.Peek(k); ok && !w.toDelete.Contains(k) { - snapshot = append(snapshot, itemWrapper{ - id: k.(ItemID), - item: value.(Item), - }) + if value, ok := w.lruMap.Peek(k); ok { + if item, ok := value.(Item); !ok || (ok && !item.IsTerminal()) { + snapshot = append(snapshot, itemWrapper{ + id: k.(ItemID), + item: value.(Item), + }) + } } } diff --git a/cache/auto_refresh_example_test.go b/cache/auto_refresh_example_test.go index 1f0bd5f..84eb00d 100644 --- a/cache/auto_refresh_example_test.go +++ b/cache/auto_refresh_example_test.go @@ -26,6 +26,10 @@ type ExampleCacheItem struct { id string } +func (e *ExampleCacheItem) IsTerminal() bool { + return e.status == ExampleStatusSucceeded +} + func (e *ExampleCacheItem) ID() string { return e.id } diff --git a/cache/auto_refresh_test.go b/cache/auto_refresh_test.go index 301b5c8..b27070b 100644 --- a/cache/auto_refresh_test.go +++ b/cache/auto_refresh_test.go @@ -25,6 +25,18 @@ type fakeCacheItem struct { val int } +func (f fakeCacheItem) IsTerminal() bool { + return false +} + +type terminalCacheItem struct { + val int +} + +func (t terminalCacheItem) IsTerminal() bool { + return true +} + func syncFakeItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error) { items := make([]ItemSyncResponse, 0, len(batch)) for _, obj := range batch { @@ -46,7 +58,11 @@ func syncFakeItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error) { return items, nil } -func TestCacheTwo(t *testing.T) { +func syncTerminalItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error) { + panic("This should never be called") +} + +func TestCacheThree(t *testing.T) { testResyncPeriod := time.Millisecond rateLimiter := workqueue.DefaultControllerRateLimiter() @@ -104,6 +120,28 @@ func TestCacheTwo(t *testing.T) { cancel() }) + + t.Run("Enqueue nothing", func(t *testing.T) { + cache, err := NewAutoRefreshCache("fake3", syncTerminalItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope()) + assert.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + assert.NoError(t, cache.Start(ctx)) + + // Create ten items in the cache + for i := 1; i <= 10; i++ { + _, err := cache.GetOrCreate(fmt.Sprintf("%d", i), terminalCacheItem{ + val: 0, + }) + assert.NoError(t, err) + } + + // Wait half a second for all resync periods to complete + // If the cache tries to enqueue the item, a panic will be thrown. + time.Sleep(500 * time.Millisecond) + + cancel() + }) } func TestQueueBuildUp(t *testing.T) { @@ -134,7 +172,7 @@ func TestQueueBuildUp(t *testing.T) { defer cancelNow() for i := 0; i < size; i++ { - _, err := cache.GetOrCreate(strconv.Itoa(i), "test") + _, err := cache.GetOrCreate(strconv.Itoa(i), fakeCacheItem{val: 3}) assert.NoError(t, err) }