Skip to content

Commit

Permalink
[BUG] Handle auto refresh cache race condition (flyteorg#5406)
Browse files Browse the repository at this point in the history
* utilize auto refresh processing set with entry expiration

Signed-off-by: Paul Dittamo <[email protected]>

* add unit test

Signed-off-by: Paul Dittamo <[email protected]>

* update processing grace period to 5 sync periods

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
  • Loading branch information
pvditt authored and robert-ulbrich-mercedes-benz committed Jul 2, 2024
1 parent e356a40 commit 47a221a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 6 deletions.
26 changes: 20 additions & 6 deletions flytestdlib/cache/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type autoRefresh struct {
lruMap *lru.Cache
// Items that are currently being processed are in the processing set.
// It will prevent the same item from being processed multiple times by different workers.
processing *syncSet
processing *sync.Map
toDelete *syncSet
syncPeriod time.Duration
workqueue workqueue.RateLimitingInterface
Expand Down Expand Up @@ -220,7 +220,7 @@ func (w *autoRefresh) GetOrCreate(id ItemID, item Item) (Item, error) {
batch := make([]ItemWrapper, 0, 1)
batch = append(batch, itemWrapper{id: id, item: item})
w.workqueue.AddRateLimited(&batch)
w.processing.Insert(id)
w.processing.Store(id, time.Now())
return item, nil
}

Expand All @@ -246,7 +246,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error {
// If not ok, it means evicted between the item was evicted between getting the keys and this update loop
// which is fine, we can just ignore.
if value, ok := w.lruMap.Peek(k); ok {
if item, ok := value.(Item); !ok || (ok && !item.IsTerminal() && !w.processing.Contains(k)) {
if item, ok := value.(Item); !ok || (ok && !item.IsTerminal() && !w.inProcessing(k)) {
snapshot = append(snapshot, itemWrapper{
id: k.(ItemID),
item: value.(Item),
Expand All @@ -264,7 +264,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error {
b := batch
w.workqueue.AddRateLimited(&b)
for i := 1; i < len(b); i++ {
w.processing.Insert(b[i].GetID())
w.processing.Store(b[i].GetID(), time.Now())
}
}

Expand Down Expand Up @@ -316,7 +316,7 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) {
newBatch := make(Batch, 0, len(*batch.(*Batch)))
for _, b := range *batch.(*Batch) {
itemID := b.GetID()
w.processing.Remove(itemID)
w.processing.Delete(itemID)
item, ok := w.lruMap.Get(itemID)
if !ok {
logger.Debugf(ctx, "item with id [%v] not found in cache", itemID)
Expand Down Expand Up @@ -359,6 +359,20 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) {
}
}

// Checks if the item is currently being processed and returns false if the item has been in processing for too long
func (w *autoRefresh) inProcessing(key interface{}) bool {
item, found := w.processing.Load(key)
if found {
// handle potential race conditions where the item is in processing but not in the workqueue
if timeItem, ok := item.(time.Time); ok && time.Since(timeItem) > (w.syncPeriod*5) {
w.processing.Delete(key)
return false
}
return true
}
return false
}

// Instantiates a new AutoRefresh Cache that syncs items in batches.
func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter,
resyncPeriod time.Duration, parallelizm, size int, scope promutils.Scope) (AutoRefresh, error) {
Expand All @@ -376,7 +390,7 @@ func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, sy
createBatchesCb: createBatches,
syncCb: syncCb,
lruMap: lruCache,
processing: newSyncSet(),
processing: &sync.Map{},
toDelete: newSyncSet(),
syncPeriod: resyncPeriod,
workqueue: workqueue.NewNamedRateLimitingQueue(syncRateLimiter, scope.CurrentScope()),
Expand Down
19 changes: 19 additions & 0 deletions flytestdlib/cache/auto_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,22 @@ func TestQueueBuildUp(t *testing.T) {
time.Sleep(5 * time.Second)
assert.Equal(t, int32(size), syncCount.Load())
}

func TestInProcessing(t *testing.T) {

syncPeriod := time.Millisecond
cache := &autoRefresh{
processing: &sync.Map{},
syncPeriod: syncPeriod,
}

assert.False(t, cache.inProcessing("test"))

cache.processing.Store("test", time.Now())
assert.True(t, cache.inProcessing("test"))

cache.processing.Store("test1", time.Now().Add(syncPeriod*-11))
assert.False(t, cache.inProcessing("test1"))
_, found := cache.processing.Load("test1")
assert.False(t, found)
}

0 comments on commit 47a221a

Please sign in to comment.