Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Add item ID to the workqueue instead #165

Closed
wants to merge 18 commits into from
15 changes: 12 additions & 3 deletions cache/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@

for _, batch := range batches {
b := batch
w.workqueue.Add(&b)
logger.Debugf(ctx, "Enqueuing batch with id: %v", b[0].GetID())
w.workqueue.Add(b[0].GetID())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that batches only have one item... I don't think this is true in all usage of AutoRefresh cache... if it's, I would rather deprecate the whole batching mechanism...
This implementation won't work for any batches > 1 item.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In flytepropeller, batches always only have one item.

func SingleItemBatches(_ context.Context, snapshot []ItemWrapper) (batches []Batch, err error) {
res := make([]Batch, 0, len(snapshot))
for _, item := range snapshot {
res = append(res, Batch{item})
}
return res, nil
}

Will clean up the code.

}

return nil
Expand Down Expand Up @@ -273,13 +274,21 @@
case <-ctx.Done():
return nil
default:
item, shutdown := w.workqueue.Get()
itemId, shutdown := w.workqueue.Get()

Check failure on line 277 in cache/auto_refresh.go

View workflow job for this annotation

GitHub Actions / Lint / Run Lint

var `itemId` should be `itemID` (golint)
if shutdown {
return nil
}

t := w.metrics.SyncLatency.Start()
updatedBatch, err := w.syncCb(ctx, *item.(*Batch))
item, ok := w.lruMap.Get(itemId)
if !ok {
logger.Debugf(ctx, "item with id [%v] not found in cache", itemId)
return nil
}
updatedBatch, err := w.syncCb(ctx, Batch{itemWrapper{
id: itemId.(ItemID),
item: item.(Item),
}})

// Since we create batches every time we sync, we will just remove the item from the queue here
// regardless of whether it succeeded the sync or not.
Expand Down
Loading