From bc466a1d608768ca67500a5997ea2b6542b3bc90 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 20 Sep 2023 13:28:36 -0700 Subject: [PATCH 01/18] test Signed-off-by: Kevin Su --- cache/auto_refresh.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index 1d1a1f0..3777065 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -235,7 +235,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { for _, batch := range batches { b := batch - w.workqueue.Add(&b) + w.workqueue.Add(b) } return nil From ef6b4ffa058e7ab166913a3fdf75ac62f016e2e0 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 20 Sep 2023 13:58:07 -0700 Subject: [PATCH 02/18] test Signed-off-by: Kevin Su --- cache/auto_refresh.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index 3777065..bb4c0dc 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -235,7 +235,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { for _, batch := range batches { b := batch - w.workqueue.Add(b) + w.workqueue.Add(b[0]) } return nil @@ -279,7 +279,7 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { } t := w.metrics.SyncLatency.Start() - updatedBatch, err := w.syncCb(ctx, *item.(*Batch)) + updatedBatch, err := w.syncCb(ctx, Batch{item.(ItemWrapper)}) // Since we create batches every time we sync, we will just remove the item from the queue here // regardless of whether it succeeded the sync or not. From c396231e4b4f3b3e193a729bc74c7a35f2eb6cde Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 20 Sep 2023 14:18:25 -0700 Subject: [PATCH 03/18] test Signed-off-by: Kevin Su --- cache/auto_refresh.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index bb4c0dc..0b9a173 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -235,7 +235,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { for _, batch := range batches { b := batch - w.workqueue.Add(b[0]) + w.workqueue.Add(b[0].GetID()) } return nil @@ -273,12 +273,16 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { case <-ctx.Done(): return nil default: - item, shutdown := w.workqueue.Get() + itemId, shutdown := w.workqueue.Get() if shutdown { return nil } t := w.metrics.SyncLatency.Start() + item, ok := w.lruMap.Get(itemId) + if !ok { + return nil + } updatedBatch, err := w.syncCb(ctx, Batch{item.(ItemWrapper)}) // Since we create batches every time we sync, we will just remove the item from the queue here From 5a2399ff6a04059d49910db3add7c6f1b97c82c1 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 20 Sep 2023 15:55:42 -0700 Subject: [PATCH 04/18] test Signed-off-by: Kevin Su --- cache/auto_refresh.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index 0b9a173..9a74e8b 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -235,6 +235,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { for _, batch := range batches { b := batch + logger.Infof(ctx, "Enqueuing batch with id: %v", b[0].GetID()) w.workqueue.Add(b[0].GetID()) } @@ -281,6 +282,7 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { t := w.metrics.SyncLatency.Start() item, ok := w.lruMap.Get(itemId) if !ok { + logger.Infof(ctx, "item with id [%v] not found in cache", itemId) return nil } updatedBatch, err := w.syncCb(ctx, Batch{item.(ItemWrapper)}) From bbce9128104e9d3d1cd4965e380767bdea556e9e Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 20 Sep 2023 16:56:22 -0700 Subject: [PATCH 05/18] test Signed-off-by: Kevin Su --- cache/auto_refresh.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index 9a74e8b..1db3be1 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -285,7 +285,10 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { logger.Infof(ctx, "item with id [%v] not found in cache", itemId) return nil } - updatedBatch, err := w.syncCb(ctx, Batch{item.(ItemWrapper)}) + updatedBatch, err := w.syncCb(ctx, Batch{itemWrapper{ + id: itemId.(ItemID), + item: item.(Item), + }}) // Since we create batches every time we sync, we will just remove the item from the queue here // regardless of whether it succeeded the sync or not. From b09b9971501347039030b82e7725b060a3f8f64b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 20 Sep 2023 17:34:21 -0700 Subject: [PATCH 06/18] nit Signed-off-by: Kevin Su --- cache/auto_refresh.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index 1db3be1..c8b38f6 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -235,7 +235,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { for _, batch := range batches { b := batch - logger.Infof(ctx, "Enqueuing batch with id: %v", b[0].GetID()) + logger.Debugf(ctx, "Enqueuing batch with id: %v", b[0].GetID()) w.workqueue.Add(b[0].GetID()) } @@ -282,7 +282,7 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { t := w.metrics.SyncLatency.Start() item, ok := w.lruMap.Get(itemId) if !ok { - logger.Infof(ctx, "item with id [%v] not found in cache", itemId) + logger.Debugf(ctx, "item with id [%v] not found in cache", itemId) return nil } updatedBatch, err := w.syncCb(ctx, Batch{itemWrapper{ From d3e61edf719821f76e47532b4a110320d3e3c1c6 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 25 Sep 2023 15:58:03 -0700 Subject: [PATCH 07/18] nit Signed-off-by: Kevin Su --- cache/auto_refresh.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index c8b38f6..ef3ffb3 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -234,9 +234,10 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { } for _, batch := range batches { - b := batch - logger.Debugf(ctx, "Enqueuing batch with id: %v", b[0].GetID()) - w.workqueue.Add(b[0].GetID()) + for _, b := range batch { + logger.Debugf(ctx, "Enqueuing batch with id: %v", b.GetID()) + w.workqueue.Add(b.GetID()) + } } return nil @@ -274,26 +275,26 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { case <-ctx.Done(): return nil default: - itemId, shutdown := w.workqueue.Get() + itemID, shutdown := w.workqueue.Get() if shutdown { return nil } t := w.metrics.SyncLatency.Start() - item, ok := w.lruMap.Get(itemId) + item, ok := w.lruMap.Get(itemID) if !ok { - logger.Debugf(ctx, "item with id [%v] not found in cache", itemId) + logger.Debugf(ctx, "item with id [%v] not found in cache", itemID) return nil } updatedBatch, err := w.syncCb(ctx, Batch{itemWrapper{ - id: itemId.(ItemID), + id: itemID.(ItemID), item: item.(Item), }}) // Since we create batches every time we sync, we will just remove the item from the queue here // regardless of whether it succeeded the sync or not. - w.workqueue.Forget(item) - w.workqueue.Done(item) + w.workqueue.Forget(itemID) + w.workqueue.Done(itemID) if err != nil { w.metrics.SyncErrors.Inc() From 9910612c9ede3bef42e8a0362e63b0b6858acb29 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 25 Sep 2023 16:33:04 -0700 Subject: [PATCH 08/18] test Signed-off-by: Kevin Su --- cache/auto_refresh.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index ef3ffb3..7f058e8 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -235,7 +235,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { for _, batch := range batches { for _, b := range batch { - logger.Debugf(ctx, "Enqueuing batch with id: %v", b.GetID()) + logger.Infof(ctx, "Enqueuing batch with id: %v", b.GetID()) w.workqueue.Add(b.GetID()) } } @@ -277,10 +277,12 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { default: itemID, shutdown := w.workqueue.Get() if shutdown { + logger.Infof(ctx, "Shutting down worker") return nil } t := w.metrics.SyncLatency.Start() + logger.Infof(ctx, "Syncing item with id [%v]", itemID) item, ok := w.lruMap.Get(itemID) if !ok { logger.Debugf(ctx, "item with id [%v] not found in cache", itemID) From 7b775a34c7ad8e495fd68cc41724ecb2899b3a26 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 25 Sep 2023 16:58:16 -0700 Subject: [PATCH 09/18] nit Signed-off-by: Kevin Su --- cache/auto_refresh.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index 7f058e8..89ea1c1 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -235,7 +235,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { for _, batch := range batches { for _, b := range batch { - logger.Infof(ctx, "Enqueuing batch with id: %v", b.GetID()) + logger.Debugf(ctx, "Enqueuing batch with id: %v", b.GetID()) w.workqueue.Add(b.GetID()) } } @@ -277,12 +277,12 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { default: itemID, shutdown := w.workqueue.Get() if shutdown { - logger.Infof(ctx, "Shutting down worker") + logger.Debugf(ctx, "Shutting down worker") return nil } t := w.metrics.SyncLatency.Start() - logger.Infof(ctx, "Syncing item with id [%v]", itemID) + logger.Debugf(ctx, "Syncing item with id [%v]", itemID) item, ok := w.lruMap.Get(itemID) if !ok { logger.Debugf(ctx, "item with id [%v] not found in cache", itemID) From a923364d54e4bf3f1a6a9cf8d58759bedcbbae39 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 25 Sep 2023 17:18:33 -0700 Subject: [PATCH 10/18] test Signed-off-by: Kevin Su --- cache/auto_refresh.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index 89ea1c1..427b8b1 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -214,6 +214,8 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { if w.toDelete.Contains(k) { w.lruMap.Remove(k) w.toDelete.Remove(k) + w.workqueue.Forget(k) + w.workqueue.Done(k) continue } // If not ok, it means evicted between the item was evicted between getting the keys and this update loop @@ -293,11 +295,6 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { item: item.(Item), }}) - // Since we create batches every time we sync, we will just remove the item from the queue here - // regardless of whether it succeeded the sync or not. - w.workqueue.Forget(itemID) - w.workqueue.Done(itemID) - if err != nil { w.metrics.SyncErrors.Inc() logger.Errorf(ctx, "failed to get latest copy of a batch. Error: %v", err) @@ -315,6 +312,8 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { w.toDelete.Range(func(key interface{}) bool { w.lruMap.Remove(key) w.toDelete.Remove(key) + w.workqueue.Forget(itemID) + w.workqueue.Done(itemID) return true }) From 23967d5109a641a78d794a2414a6c5a21cc8578b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 25 Sep 2023 19:59:17 -0700 Subject: [PATCH 11/18] test Signed-off-by: Kevin Su --- cache/auto_refresh.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index 427b8b1..b0cc1c9 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -214,8 +214,6 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { if w.toDelete.Contains(k) { w.lruMap.Remove(k) w.toDelete.Remove(k) - w.workqueue.Forget(k) - w.workqueue.Done(k) continue } // If not ok, it means evicted between the item was evicted between getting the keys and this update loop @@ -295,6 +293,9 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { item: item.(Item), }}) + w.workqueue.Forget(itemID) + w.workqueue.Done(itemID) + if err != nil { w.metrics.SyncErrors.Inc() logger.Errorf(ctx, "failed to get latest copy of a batch. Error: %v", err) @@ -312,8 +313,6 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { w.toDelete.Range(func(key interface{}) bool { w.lruMap.Remove(key) w.toDelete.Remove(key) - w.workqueue.Forget(itemID) - w.workqueue.Done(itemID) return true }) From bfcb747f9443d7507568b1931046690fa9a458cf Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 25 Sep 2023 23:20:18 -0700 Subject: [PATCH 12/18] test Signed-off-by: Kevin Su --- cache/auto_refresh.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index b0cc1c9..6545070 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -293,8 +293,8 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { item: item.(Item), }}) - w.workqueue.Forget(itemID) - w.workqueue.Done(itemID) + w.workqueue.Forget(item) + w.workqueue.Done(item) if err != nil { w.metrics.SyncErrors.Inc() From 80d4cd89b5087a9c8e3f2e28cdf7912a2f72dbb3 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 25 Sep 2023 23:44:00 -0700 Subject: [PATCH 13/18] test Signed-off-by: Kevin Su --- cache/auto_refresh.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index 6545070..b0cc1c9 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -293,8 +293,8 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { item: item.(Item), }}) - w.workqueue.Forget(item) - w.workqueue.Done(item) + w.workqueue.Forget(itemID) + w.workqueue.Done(itemID) if err != nil { w.metrics.SyncErrors.Inc() From a3a8536b54cc61790fef11f6921126f6a913ca0f Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 26 Sep 2023 00:24:44 -0700 Subject: [PATCH 14/18] test Signed-off-by: Kevin Su --- cache/auto_refresh.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index b0cc1c9..ef6ed2b 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -286,7 +286,7 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { item, ok := w.lruMap.Get(itemID) if !ok { logger.Debugf(ctx, "item with id [%v] not found in cache", itemID) - return nil + continue } updatedBatch, err := w.syncCb(ctx, Batch{itemWrapper{ id: itemID.(ItemID), From 5ea45afea763bc466baf94514f992d4cfab7c674 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 26 Sep 2023 00:30:21 -0700 Subject: [PATCH 15/18] nit Signed-off-by: Kevin Su --- cache/auto_refresh.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index ef6ed2b..6907642 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -286,6 +286,7 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { item, ok := w.lruMap.Get(itemID) if !ok { logger.Debugf(ctx, "item with id [%v] not found in cache", itemID) + t.Stop() continue } updatedBatch, err := w.syncCb(ctx, Batch{itemWrapper{ From d4ae3ad3dd15d450079e4f19d78a4ef6cd705199 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 26 Sep 2023 00:31:15 -0700 Subject: [PATCH 16/18] nit Signed-off-by: Kevin Su --- cache/auto_refresh.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index 6907642..d18da90 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -294,6 +294,8 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { item: item.(Item), }}) + // Since we create batches every time we sync, we will just remove the item from the queue here + // regardless of whether it succeeded the sync or not. w.workqueue.Forget(itemID) w.workqueue.Done(itemID) From d612b68f12633a1455e9988c04c3593fb1edb087 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 26 Sep 2023 16:11:54 -0700 Subject: [PATCH 17/18] nit Signed-off-by: Kevin Su --- cache/auto_refresh.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index d18da90..2fd072d 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -313,12 +313,6 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { } } - w.toDelete.Range(func(key interface{}) bool { - w.lruMap.Remove(key) - w.toDelete.Remove(key) - return true - }) - t.Stop() } } From 07e6f2229cf21cb07145a0e2d10b0c41503d1cca Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 26 Sep 2023 16:23:05 -0700 Subject: [PATCH 18/18] nit Signed-off-by: Kevin Su --- cache/auto_refresh.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index 2fd072d..d18da90 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -313,6 +313,12 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { } } + w.toDelete.Range(func(key interface{}) bool { + w.lruMap.Remove(key) + w.toDelete.Remove(key) + return true + }) + t.Stop() } }