diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index 1d1a1f0..d18da90 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -234,8 +234,10 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { } for _, batch := range batches { - b := batch - w.workqueue.Add(&b) + for _, b := range batch { + logger.Debugf(ctx, "Enqueuing batch with id: %v", b.GetID()) + w.workqueue.Add(b.GetID()) + } } return nil @@ -273,18 +275,29 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { case <-ctx.Done(): return nil default: - item, shutdown := w.workqueue.Get() + itemID, shutdown := w.workqueue.Get() if shutdown { + logger.Debugf(ctx, "Shutting down worker") return nil } t := w.metrics.SyncLatency.Start() - updatedBatch, err := w.syncCb(ctx, *item.(*Batch)) + logger.Debugf(ctx, "Syncing item with id [%v]", itemID) + item, ok := w.lruMap.Get(itemID) + if !ok { + logger.Debugf(ctx, "item with id [%v] not found in cache", itemID) + t.Stop() + continue + } + updatedBatch, err := w.syncCb(ctx, Batch{itemWrapper{ + id: itemID.(ItemID), + item: item.(Item), + }}) // Since we create batches every time we sync, we will just remove the item from the queue here // regardless of whether it succeeded the sync or not. - w.workqueue.Forget(item) - w.workqueue.Done(item) + w.workqueue.Forget(itemID) + w.workqueue.Done(itemID) if err != nil { w.metrics.SyncErrors.Inc()