Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Only enqueue non-terminal tasks #164

Merged
merged 14 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions cache/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@
scope promutils.Scope
}

type Item interface{}
type Item interface {
IsTerminal() bool
}

// Items are wrapped inside an ItemWrapper to be stored in the cache.
type ItemWrapper interface {
Expand Down Expand Up @@ -164,7 +166,7 @@
go wait.Until(func() {
err := w.enqueueBatches(enqueueCtx)
if err != nil {
logger.Errorf(enqueueCtx, "Failed to sync. Error: %v", err)
logger.Errorf(enqueueCtx, "Failed to enqueue. Error: %v", err)

Check warning on line 169 in cache/auto_refresh.go

View check run for this annotation

Codecov / codecov/patch

cache/auto_refresh.go#L169

Added line #L169 was not covered by tests
}
}, w.syncPeriod, enqueueCtx.Done())

Expand Down Expand Up @@ -209,13 +211,20 @@

snapshot := make([]ItemWrapper, 0, len(keys))
for _, k := range keys {
if w.toDelete.Contains(k) {
w.lruMap.Remove(k)
w.toDelete.Remove(k)
continue

Check warning on line 217 in cache/auto_refresh.go

View check run for this annotation

Codecov / codecov/patch

cache/auto_refresh.go#L215-L217

Added lines #L215 - L217 were not covered by tests
}
// If not ok, it means evicted between the item was evicted between getting the keys and this update loop
// which is fine, we can just ignore.
if value, ok := w.lruMap.Peek(k); ok && !w.toDelete.Contains(k) {
snapshot = append(snapshot, itemWrapper{
id: k.(ItemID),
item: value.(Item),
})
if value, ok := w.lruMap.Peek(k); ok {
if item, ok := value.(Item); !ok || (ok && !item.IsTerminal()) {
snapshot = append(snapshot, itemWrapper{
id: k.(ItemID),
item: value.(Item),
})
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions cache/auto_refresh_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ type ExampleCacheItem struct {
id string
}

func (e *ExampleCacheItem) IsTerminal() bool {
return e.status == ExampleStatusSucceeded
}

func (e *ExampleCacheItem) ID() string {
return e.id
}
Expand Down
42 changes: 40 additions & 2 deletions cache/auto_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ type fakeCacheItem struct {
val int
}

func (f fakeCacheItem) IsTerminal() bool {
return false
}

type terminalCacheItem struct {
val int
}

func (t terminalCacheItem) IsTerminal() bool {
return true
}

func syncFakeItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error) {
items := make([]ItemSyncResponse, 0, len(batch))
for _, obj := range batch {
Expand All @@ -46,7 +58,11 @@ func syncFakeItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error) {
return items, nil
}

func TestCacheTwo(t *testing.T) {
func syncTerminalItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error) {
panic("This should never be called")
}

func TestCacheThree(t *testing.T) {
testResyncPeriod := time.Millisecond
rateLimiter := workqueue.DefaultControllerRateLimiter()

Expand Down Expand Up @@ -104,6 +120,28 @@ func TestCacheTwo(t *testing.T) {

cancel()
})

t.Run("Enqueue nothing", func(t *testing.T) {
cache, err := NewAutoRefreshCache("fake3", syncTerminalItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope())
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
assert.NoError(t, cache.Start(ctx))

// Create ten items in the cache
for i := 1; i <= 10; i++ {
_, err := cache.GetOrCreate(fmt.Sprintf("%d", i), terminalCacheItem{
val: 0,
})
assert.NoError(t, err)
}

// Wait half a second for all resync periods to complete
// If the cache tries to enqueue the item, a panic will be thrown.
time.Sleep(500 * time.Millisecond)

cancel()
})
}

func TestQueueBuildUp(t *testing.T) {
Expand Down Expand Up @@ -134,7 +172,7 @@ func TestQueueBuildUp(t *testing.T) {
defer cancelNow()

for i := 0; i < size; i++ {
_, err := cache.GetOrCreate(strconv.Itoa(i), "test")
_, err := cache.GetOrCreate(strconv.Itoa(i), fakeCacheItem{val: 3})
assert.NoError(t, err)
}

Expand Down
Loading