From 497264ab01e0ab5e3878ed22dbbefd30855bda0f Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 28 Aug 2023 19:43:53 -0700 Subject: [PATCH 01/14] wip Signed-off-by: Kevin Su --- cache/auto_refresh.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index d4bed3e..b0851b8 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -265,6 +265,7 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { return nil default: item, shutdown := w.workqueue.Get() + logger.Infof(ctx, "Got item from workqueue: %v", (*item.(*Batch))[0].GetID()) if shutdown { return nil } From ebd598ab328d671cded2a10877e57ba9558c5fe1 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 7 Sep 2023 00:30:23 -0700 Subject: [PATCH 02/14] test Signed-off-by: Kevin Su --- cache/auto_refresh.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index b0851b8..d4a488a 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -56,7 +56,9 @@ type metrics struct { scope promutils.Scope } -type Item interface{} +type Item interface { + isTerminal() bool +} // Items are wrapped inside an ItemWrapper to be stored in the cache. type ItemWrapper interface { @@ -164,7 +166,7 @@ func (w *autoRefresh) Start(ctx context.Context) error { go wait.Until(func() { err := w.enqueueBatches(enqueueCtx) if err != nil { - logger.Errorf(enqueueCtx, "Failed to sync. Error: %v", err) + logger.Errorf(enqueueCtx, "Failed to enqueue. Error: %v", err) } }, w.syncPeriod, enqueueCtx.Done()) @@ -265,11 +267,16 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { return nil default: item, shutdown := w.workqueue.Get() - logger.Infof(ctx, "Got item from workqueue: %v", (*item.(*Batch))[0].GetID()) + batch := (*item.(*Batch))[0] + logger.Infof(ctx, "Got item from workqueue: %v", batch.GetID()) if shutdown { return nil } - + if batch.GetItem().isTerminal() { + w.workqueue.Forget(item) + w.workqueue.Done(item) + continue + } t := w.metrics.SyncLatency.Start() updatedBatch, err := w.syncCb(ctx, *item.(*Batch)) @@ -297,7 +304,6 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { w.toDelete.Remove(key) return true }) - t.Stop() } } From 15c45688e2bb3ccaba4c50500fb761154a9f3cbe Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 7 Sep 2023 00:41:25 -0700 Subject: [PATCH 03/14] nit Signed-off-by: Kevin Su --- cache/auto_refresh.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index d4a488a..3661a61 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -57,7 +57,7 @@ type metrics struct { } type Item interface { - isTerminal() bool + IsTerminal() bool } // Items are wrapped inside an ItemWrapper to be stored in the cache. From f4ba4f12d7e1a5b7b850fa86f77b941983e0d59f Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 8 Sep 2023 16:08:16 -0700 Subject: [PATCH 04/14] nit Signed-off-by: Kevin Su --- cache/auto_refresh.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index 3661a61..dc71180 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -81,6 +81,9 @@ const ( // The item returned has been updated and should be updated in the cache Update + + // The item should be removed from the cache + Delete ) // SyncFunc func type. Your implementation of this function for your cache instance is responsible for returning @@ -272,7 +275,7 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { if shutdown { return nil } - if batch.GetItem().isTerminal() { + if batch.GetItem().IsTerminal() { w.workqueue.Forget(item) w.workqueue.Done(item) continue From 0ff0904266cc020cdddbddbe42b992bd3c3ac273 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 9 Sep 2023 14:45:53 -0700 Subject: [PATCH 05/14] test Signed-off-by: Kevin Su --- cache/auto_refresh.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index dc71180..5d24140 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -216,7 +216,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { for _, k := range keys { // If not ok, it means evicted between the item was evicted between getting the keys and this update loop // which is fine, we can just ignore. - if value, ok := w.lruMap.Peek(k); ok && !w.toDelete.Contains(k) { + if value, ok := w.lruMap.Peek(k); ok && !w.toDelete.Contains(k) && !value.(Item).IsTerminal() { snapshot = append(snapshot, itemWrapper{ id: k.(ItemID), item: value.(Item), @@ -275,11 +275,7 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { if shutdown { return nil } - if batch.GetItem().IsTerminal() { - w.workqueue.Forget(item) - w.workqueue.Done(item) - continue - } + t := w.metrics.SyncLatency.Start() updatedBatch, err := w.syncCb(ctx, *item.(*Batch)) From 93a067a1bad12699ac94cbed0853015c179bf515 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 12 Sep 2023 14:04:46 -0700 Subject: [PATCH 06/14] nit Signed-off-by: Kevin Su --- cache/auto_refresh.go | 3 +-- utils/auto_refresh_cache.go | 3 --- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index 5d24140..96adddd 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -270,8 +270,6 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { return nil default: item, shutdown := w.workqueue.Get() - batch := (*item.(*Batch))[0] - logger.Infof(ctx, "Got item from workqueue: %v", batch.GetID()) if shutdown { return nil } @@ -303,6 +301,7 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { w.toDelete.Remove(key) return true }) + t.Stop() } } diff --git a/utils/auto_refresh_cache.go b/utils/auto_refresh_cache.go index 0491ba8..fbd81c8 100644 --- a/utils/auto_refresh_cache.go +++ b/utils/auto_refresh_cache.go @@ -42,9 +42,6 @@ const ( // The item returned has been updated and should be updated in the cache Update - - // The item should be removed from the cache - Delete ) // CacheSyncItem is a func type. Your implementation of this function for your cache instance is responsible for returning From a8f2b07cd8434b11d9aef9ff7e6d04f7ded4f838 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 12 Sep 2023 14:05:22 -0700 Subject: [PATCH 07/14] nit Signed-off-by: Kevin Su --- cache/auto_refresh.go | 3 --- utils/auto_refresh_cache.go | 3 +++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index 96adddd..0037eba 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -81,9 +81,6 @@ const ( // The item returned has been updated and should be updated in the cache Update - - // The item should be removed from the cache - Delete ) // SyncFunc func type. Your implementation of this function for your cache instance is responsible for returning diff --git a/utils/auto_refresh_cache.go b/utils/auto_refresh_cache.go index fbd81c8..0491ba8 100644 --- a/utils/auto_refresh_cache.go +++ b/utils/auto_refresh_cache.go @@ -42,6 +42,9 @@ const ( // The item returned has been updated and should be updated in the cache Update + + // The item should be removed from the cache + Delete ) // CacheSyncItem is a func type. Your implementation of this function for your cache instance is responsible for returning From 1719f002f67564095f7d40a5b019862260900f33 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 12 Sep 2023 15:55:42 -0700 Subject: [PATCH 08/14] nit Signed-off-by: Kevin Su --- cache/auto_refresh_example_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cache/auto_refresh_example_test.go b/cache/auto_refresh_example_test.go index 1f0bd5f..84eb00d 100644 --- a/cache/auto_refresh_example_test.go +++ b/cache/auto_refresh_example_test.go @@ -26,6 +26,10 @@ type ExampleCacheItem struct { id string } +func (e *ExampleCacheItem) IsTerminal() bool { + return e.status == ExampleStatusSucceeded +} + func (e *ExampleCacheItem) ID() string { return e.id } From 3a8e289cba87f7a3de2f2abd11ba99ce8588d358 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 12 Sep 2023 16:59:38 -0700 Subject: [PATCH 09/14] add tests Signed-off-by: Kevin Su --- cache/auto_refresh_test.go | 40 +++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/cache/auto_refresh_test.go b/cache/auto_refresh_test.go index 301b5c8..bc18290 100644 --- a/cache/auto_refresh_test.go +++ b/cache/auto_refresh_test.go @@ -25,6 +25,18 @@ type fakeCacheItem struct { val int } +func (f fakeCacheItem) IsTerminal() bool { + return false +} + +type terminalCacheItem struct { + val int +} + +func (t terminalCacheItem) IsTerminal() bool { + return true +} + func syncFakeItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error) { items := make([]ItemSyncResponse, 0, len(batch)) for _, obj := range batch { @@ -46,6 +58,10 @@ func syncFakeItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error) { return items, nil } +func syncTerminalItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error) { + panic("This should never be called") +} + func TestCacheTwo(t *testing.T) { testResyncPeriod := time.Millisecond rateLimiter := workqueue.DefaultControllerRateLimiter() @@ -104,6 +120,28 @@ func TestCacheTwo(t *testing.T) { cancel() }) + + t.Run("Enqueue nothing", func(t *testing.T) { + cache, err := NewAutoRefreshCache("fake3", syncTerminalItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope()) + assert.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + assert.NoError(t, cache.Start(ctx)) + + // Create ten items in the cache + for i := 1; i <= 10; i++ { + _, err := cache.GetOrCreate(fmt.Sprintf("%d", i), terminalCacheItem{ + val: 0, + }) + assert.NoError(t, err) + } + + // Wait half a second for all resync periods to complete + // If the cache tries to enqueue the item, a panic will be thrown. + time.Sleep(500 * time.Millisecond) + + cancel() + }) } func TestQueueBuildUp(t *testing.T) { @@ -134,7 +172,7 @@ func TestQueueBuildUp(t *testing.T) { defer cancelNow() for i := 0; i < size; i++ { - _, err := cache.GetOrCreate(strconv.Itoa(i), "test") + _, err := cache.GetOrCreate(strconv.Itoa(i), fakeCacheItem{val: 3}) assert.NoError(t, err) } From 8a2b71ed12075db06561ecde206125112826a144 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 12 Sep 2023 17:06:30 -0700 Subject: [PATCH 10/14] nit Signed-off-by: Kevin Su --- cache/auto_refresh_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cache/auto_refresh_test.go b/cache/auto_refresh_test.go index bc18290..b27070b 100644 --- a/cache/auto_refresh_test.go +++ b/cache/auto_refresh_test.go @@ -62,7 +62,7 @@ func syncTerminalItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error panic("This should never be called") } -func TestCacheTwo(t *testing.T) { +func TestCacheThree(t *testing.T) { testResyncPeriod := time.Millisecond rateLimiter := workqueue.DefaultControllerRateLimiter() From 77b57fb838454a83ff9b313edd23a3580b894cb8 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 13 Sep 2023 10:54:49 -0700 Subject: [PATCH 11/14] remove item if it's ready to be deleted Signed-off-by: Kevin Su --- cache/auto_refresh.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index 0037eba..a2a0a95 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -211,9 +211,14 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { snapshot := make([]ItemWrapper, 0, len(keys)) for _, k := range keys { + if w.toDelete.Contains(k) { + w.lruMap.Remove(k) + w.toDelete.Remove(k) + continue + } // If not ok, it means evicted between the item was evicted between getting the keys and this update loop // which is fine, we can just ignore. - if value, ok := w.lruMap.Peek(k); ok && !w.toDelete.Contains(k) && !value.(Item).IsTerminal() { + if value, ok := w.lruMap.Peek(k); ok && !value.(Item).IsTerminal() { snapshot = append(snapshot, itemWrapper{ id: k.(ItemID), item: value.(Item), From 6b69b970ccb0910d747c041f2acb4f74e7d7abf8 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 13 Sep 2023 16:05:23 -0700 Subject: [PATCH 12/14] type assertion Signed-off-by: Kevin Su --- cache/auto_refresh.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index a2a0a95..53d89f4 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -218,11 +218,13 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { } // If not ok, it means evicted between the item was evicted between getting the keys and this update loop // which is fine, we can just ignore. - if value, ok := w.lruMap.Peek(k); ok && !value.(Item).IsTerminal() { - snapshot = append(snapshot, itemWrapper{ - id: k.(ItemID), - item: value.(Item), - }) + if value, ok := w.lruMap.Peek(k); ok { + if item, ok := value.(Item); ok && item.IsTerminal() { + snapshot = append(snapshot, itemWrapper{ + id: k.(ItemID), + item: value.(Item), + }) + } } } From 699a936c02dd5b46cf642ae4783394660caec982 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 13 Sep 2023 16:07:22 -0700 Subject: [PATCH 13/14] nit Signed-off-by: Kevin Su --- cache/auto_refresh.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index 53d89f4..c786de6 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -219,7 +219,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { // If not ok, it means evicted between the item was evicted between getting the keys and this update loop // which is fine, we can just ignore. if value, ok := w.lruMap.Peek(k); ok { - if item, ok := value.(Item); ok && item.IsTerminal() { + if item, ok := value.(Item); ok && !item.IsTerminal() { snapshot = append(snapshot, itemWrapper{ id: k.(ItemID), item: value.(Item), From 15416e03f5a287e524d7ae63207e04e402777ea4 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 13 Sep 2023 16:08:30 -0700 Subject: [PATCH 14/14] nit Signed-off-by: Kevin Su --- cache/auto_refresh.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index c786de6..1d1a1f0 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -219,7 +219,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { // If not ok, it means evicted between the item was evicted between getting the keys and this update loop // which is fine, we can just ignore. if value, ok := w.lruMap.Peek(k); ok { - if item, ok := value.(Item); ok && !item.IsTerminal() { + if item, ok := value.(Item); !ok || (ok && !item.IsTerminal()) { snapshot = append(snapshot, itemWrapper{ id: k.(ItemID), item: value.(Item),