diff --git a/db/change_cache.go b/db/change_cache.go index 83e0ce5a61..e4b581c919 100644 --- a/db/change_cache.go +++ b/db/change_cache.go @@ -16,7 +16,6 @@ import ( "errors" "fmt" "math" - "sort" "strconv" "strings" "sync" @@ -126,6 +125,10 @@ func (entry *LogEntry) SetDeleted() { entry.Flags |= channels.Deleted } +func (entry *LogEntry) IsUnusedRange() bool { + return entry.DocID == "" && entry.EndSequence > 0 +} + type LogEntries []*LogEntry // A priority-queue of LogEntries, kept ordered by increasing sequence #. @@ -633,71 +636,23 @@ func (c *changeCache) processUnusedRange(ctx context.Context, fromSequence, toSe // isn't possible under normal processing - unused sequence ranges will normally be moved // from pending to skipped in their entirety, as it's the processing of the pending sequence // *after* the range that triggers the range to be skipped. A partial range in skipped means - // an duplicate entry with a sequence within the bounds of the range was previously present + // a duplicate entry with a sequence within the bounds of the range was previously present // in pending. - base.WarnfCtx(ctx, "unused sequence range of #%d to %d contains duplicate sequences", fromSequence, toSequence) + base.WarnfCtx(ctx, "unused sequence range of #%d to %d contains duplicate sequences, will be ignored", fromSequence, toSequence) } return allChangedChannels } -// _pushRangeToPending will push a sequence range to pending logs. If pending has entries in it, we will check if -// those entries are in the range and handle it, so we don't push duplicate sequences to pending +// _pushRangeToPending will push a sequence range to pendingLogs func (c *changeCache) _pushRangeToPending(ctx context.Context, startSeq, endSeq uint64, timeReceived time.Time) { - if c.pendingLogs.Len() == 0 { - // push whole range & return early to avoid duplicate checks - entry := &LogEntry{ - TimeReceived: timeReceived, - Sequence: startSeq, - EndSequence: endSeq, - } - heap.Push(&c.pendingLogs, entry) - return - } - // check for duplicate sequences between range and pending logs - // loop till we have processed unused sequence range (or until we - // have range of sequences that aren't present in pending list) - for startSeq <= endSeq { - i, found := sort.Find(c.pendingLogs.Len(), func(i int) int { - value := c.pendingLogs[i] - if value.Sequence > endSeq { - // range is less than current pending entry - return -1 - } - if startSeq <= value.Sequence && endSeq >= value.Sequence { - // found pending entry that has duplicate entry between itself and unused range - return 0 - } - // range is larger then current element - return 1 - }) - if found { - // grab pending entry at that index and process unused range between startSeq and pending entry.Sequence - 1 - pendingEntry := c.pendingLogs[i] - base.DebugfCtx(ctx, base.KeyCache, "Ignoring duplicate of #%d (unusedSequence)", pendingEntry.Sequence) - entry := &LogEntry{ - TimeReceived: timeReceived, - Sequence: startSeq, - EndSequence: pendingEntry.Sequence - 1, - } - heap.Push(&c.pendingLogs, entry) - // update start seq on range - startSeq = pendingEntry.Sequence + 1 - } else { - // if range not found in pending then break from loop early - break - } + entry := &LogEntry{ + TimeReceived: timeReceived, + Sequence: startSeq, + EndSequence: endSeq, } + heap.Push(&c.pendingLogs, entry) - // push what's left of seq range - if startSeq <= endSeq { - entry := &LogEntry{ - TimeReceived: timeReceived, - Sequence: startSeq, - EndSequence: endSeq, - } - heap.Push(&c.pendingLogs, entry) - } } // Process unused sequence notification. Extracts sequence from docID and sends to cache for buffering @@ -885,16 +840,23 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set { isNext = oldestPending.Sequence == c.nextSequence if isNext { - heap.Pop(&c.pendingLogs) + oldestPending = c._popPendingLog(ctx) changedChannels = changedChannels.UpdateWithSlice(c._addToCache(ctx, oldestPending)) + } else if oldestPending.Sequence < c.nextSequence { + // oldest pending is lower than next sequence, should be ignored + base.InfofCtx(ctx, base.KeyCache, "Oldest entry in pending logs %v (%d, %d) is earlier than cache next sequence (%d), ignoring as sequence has already been cached", base.UD(oldestPending.DocID), oldestPending.Sequence, oldestPending.EndSequence, c.nextSequence) + oldestPending = c._popPendingLog(ctx) + + // If the oldestPending was a range that extended past nextSequence, update nextSequence + if oldestPending.IsUnusedRange() && oldestPending.EndSequence >= c.nextSequence { + c.nextSequence = oldestPending.EndSequence + 1 + } } else if len(c.pendingLogs) > c.options.CachePendingSeqMaxNum || time.Since(c.pendingLogs[0].TimeReceived) >= c.options.CachePendingSeqMaxWait { // Skip all sequences up to the oldest Pending c.PushSkipped(ctx, c.nextSequence, oldestPending.Sequence-1) - // disallow c.nextSequence decreasing - if c.nextSequence < oldestPending.Sequence { - c.nextSequence = oldestPending.Sequence - } + c.nextSequence = oldestPending.Sequence } else { + // nextSequence is not in pending logs, and pending logs size/age doesn't trigger skipped sequences break } } @@ -905,6 +867,41 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set { return changedChannels } +// _popPendingLog pops the next pending LogEntry from the c.pendingLogs heap. When the popped entry is an unused range, +// performs a defensive check for duplicates with the next entry in pending. If unused range overlaps with next entry, +// reduces the unused range to stop at the next pending entry. +func (c *changeCache) _popPendingLog(ctx context.Context) *LogEntry { + poppedEntry := heap.Pop(&c.pendingLogs).(*LogEntry) + // If it's not a range, no additional handling needed + if !poppedEntry.IsUnusedRange() { + return poppedEntry + } + // If there are no more pending logs, no additional handling needed + if len(c.pendingLogs) == 0 { + return poppedEntry + } + + nextPendingEntry := c.pendingLogs[0] + // If popped entry range does not overlap with next pending entry, no additional handling needed + // e.g. popped [15-20], nextPendingEntry is [25] + if poppedEntry.EndSequence < nextPendingEntry.Sequence { + return poppedEntry + } + + // If nextPendingEntry's sequence duplicates the start of the unused range, ignored popped entry and return next entry instead + // e.g. popped [15-20], nextPendingEntry is [15] + if poppedEntry.Sequence == nextPendingEntry.Sequence { + base.InfofCtx(ctx, base.KeyCache, "Unused sequence range in pendingLogs (%d, %d) has start equal to next pending sequence (%s, %d) - unused range will be ignored", poppedEntry.Sequence, poppedEntry.EndSequence, nextPendingEntry.DocID, nextPendingEntry.Sequence) + return c._popPendingLog(ctx) + } + + // Otherwise, reduce the popped unused range to end before the next pending sequence + // e.g. popped [15-20], nextPendingEntry is [18] + base.InfofCtx(ctx, base.KeyCache, "Unused sequence range in pendingLogs (%d, %d) overlaps with next pending sequence (%s, %d) - unused range will be truncated", poppedEntry.Sequence, poppedEntry.EndSequence, nextPendingEntry.DocID, nextPendingEntry.Sequence) + poppedEntry.EndSequence = nextPendingEntry.Sequence - 1 + return poppedEntry +} + func (c *changeCache) GetStableSequence(docID string) SequenceID { // Stable sequence is independent of docID in changeCache return SequenceID{Seq: c.LastSequence()} diff --git a/db/change_cache_test.go b/db/change_cache_test.go index 832877e8be..17374f692f 100644 --- a/db/change_cache_test.go +++ b/db/change_cache_test.go @@ -9,6 +9,7 @@ package db import ( + "container/heap" "context" "encoding/binary" "errors" @@ -2932,92 +2933,6 @@ func TestReleasedSequenceRangeHandlingEdgeCase2(t *testing.T) { }, time.Second*10, time.Millisecond*100) } -// TestReleasedSequenceRangeHandlingDuplicateSequencesInPending: -// - Test releasing unused sequence range that should be pushed to pending -// - Mock having a sequences in middle of the range already in pending -// - Assert that the range pushed is split into separate ranges onto pending list -// - Add new unused range to unblocks pending , assert it empties pending and skipped is not filled -func TestReleasedSequenceRangeHandlingDuplicateSequencesInPending(t *testing.T) { - base.SetUpTestLogging(t, base.LevelDebug, base.KeyCache) - - ctx := base.TestCtx(t) - bucket := base.GetTestBucket(t) - dbContext, err := NewDatabaseContext(ctx, "db", bucket, false, DatabaseContextOptions{ - Scopes: GetScopesOptions(t, bucket, 1), - }) - require.NoError(t, err) - defer dbContext.Close(ctx) - - err = dbContext.StartOnlineProcesses(ctx) - require.NoError(t, err) - - ctx = dbContext.AddDatabaseLogContext(ctx) - testChangeCache := &changeCache{} - if err := testChangeCache.Init(ctx, dbContext, dbContext.channelCache, nil, &CacheOptions{ - CachePendingSeqMaxWait: 20 * time.Minute, - CacheSkippedSeqMaxWait: 20 * time.Minute, - CachePendingSeqMaxNum: 20, - }, dbContext.MetadataKeys); err != nil { - log.Printf("Init failed for testChangeCache: %v", err) - t.Fail() - } - - if err := testChangeCache.Start(0); err != nil { - log.Printf("Start error for testChangeCache: %v", err) - t.Fail() - } - defer testChangeCache.Stop(ctx) - require.NoError(t, err) - - // push two entries that will be pushed to pending - entry := &LogEntry{ - Sequence: 14, - DocID: fmt.Sprintf("doc_%d", 50), - RevID: "1-abcdefabcdefabcdef", - TimeReceived: time.Now(), - TimeSaved: time.Now(), - } - _ = testChangeCache.processEntry(ctx, entry) - - entry = &LogEntry{ - Sequence: 18, - DocID: fmt.Sprintf("doc_%d", 50), - RevID: "1-abcdefabcdefabcdef", - TimeReceived: time.Now(), - TimeSaved: time.Now(), - } - _ = testChangeCache.processEntry(ctx, entry) - - // process unusedSeq range with pending range containing duplicate sequences that are in pending list - // Pending should contain: (10-13), 14, (15-17) 18, (19-20) - testChangeCache.releaseUnusedSequenceRange(ctx, 10, 20, time.Now()) - - // assert pending has 4 elements - require.EventuallyWithT(t, func(c *assert.CollectT) { - testChangeCache.updateStats(ctx) - assert.Equal(c, int64(5), dbContext.DbStats.CacheStats.PendingSeqLen.Value()) - assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) - assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) - assert.Equal(c, uint64(1), testChangeCache.nextSequence) - dbContext.UpdateCalculatedStats(ctx) - assert.Equal(c, int64(20), dbContext.DbStats.CacheStats.HighSeqCached.Value()) - }, time.Second*10, time.Millisecond*100) - - // unblock pending and assert items are processed correct - testChangeCache.releaseUnusedSequenceRange(ctx, 1, 10, time.Now()) - - // assert pending empties - require.EventuallyWithT(t, func(c *assert.CollectT) { - testChangeCache.updateStats(ctx) - assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.PendingSeqLen.Value()) - assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) - assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) - assert.Equal(c, uint64(21), testChangeCache.nextSequence) - dbContext.UpdateCalculatedStats(ctx) - assert.Equal(c, int64(20), dbContext.DbStats.CacheStats.HighSeqCached.Value()) - }, time.Second*10, time.Millisecond*100) -} - // TestReleasedSequenceRangeHandlingDuplicateSequencesInSkipped: // - Test releasing unused sequence range that has duplicate sequences in skipped sequence list // - Assert sequences in the range are removed from skipped @@ -3135,3 +3050,229 @@ func TestReleasedSequenceRangeHandlingDuplicateSequencesInSkipped(t *testing.T) assert.Equal(c, int64(19), dbContext.DbStats.CacheStats.HighSeqCached.Value()) }, time.Second*10, time.Millisecond*100) } + +// TestAddPendingLogs: +// - Test age-based eviction of sequences and ranges from pending logs. +// - Adds to pending logs directly via heap.Push with backdated TimeReceived, +// triggers eviction with call to _addPendingLogs +// - tests duplicate handling when popping pending entries +// - reproduces CBG-4215 +func TestAddPendingLogs(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeyCache, base.KeyChanges) + + ctx := base.TestCtx(t) + bucket := base.GetTestBucket(t) + dbContext, err := NewDatabaseContext(ctx, "db", bucket, false, DatabaseContextOptions{ + Scopes: GetScopesOptions(t, bucket, 1), + }) + require.NoError(t, err) + defer dbContext.Close(ctx) + + ctx = dbContext.AddDatabaseLogContext(ctx) + err = dbContext.StartOnlineProcesses(ctx) + require.NoError(t, err) + + testCases := []struct { + incoming []sequenceRange // test simulates adding these to pending in the order found in the slice + channelName string // non-range values (endSeq=0) will be assigned to this channel for verification of proper caching + expectedCached []uint64 // expected cached sequences + expectedSkipped []sequenceRange // expected skipped sequence ranges + expectedNextSequence uint64 // expected cache.nextSequence + }{ + { + // single range + incoming: []sequenceRange{{2, 4}}, + expectedNextSequence: 5, + expectedSkipped: []sequenceRange{{1, 1}}, + }, + { + // multiple non-overlapping ranges arriving out of sequence order + incoming: []sequenceRange{{2, 4}, {9, 10}, {6, 8}, {14, 20}, {11, 13}, {5, 5}}, + expectedNextSequence: 21, + expectedSkipped: []sequenceRange{{1, 1}}, + }, + { + // non overlapping ranges, arrive in sequence order + incoming: []sequenceRange{{2, 4}, {5, 8}}, + expectedNextSequence: 9, + expectedSkipped: []sequenceRange{{1, 1}}, + }, + { + // non overlapping ranges, arrive out of sequence order + incoming: []sequenceRange{{5, 8}, {2, 4}}, + expectedNextSequence: 9, + expectedSkipped: []sequenceRange{{1, 1}}, + }, + { + // range overlaps single, range arrives first + incoming: []sequenceRange{{2, 4}, {3, 0}}, + channelName: "A", + expectedNextSequence: 4, + expectedCached: []uint64{3}, + expectedSkipped: []sequenceRange{{1, 1}}, + }, + { + // single arrives then range overlaps single both sides + incoming: []sequenceRange{{3, 3}, {2, 4}}, + expectedNextSequence: 4, + expectedSkipped: []sequenceRange{{1, 1}}, + }, + { + // range arrives then another that completely covers range in the pending list + incoming: []sequenceRange{{4, 8}, {2, 10}}, + expectedNextSequence: 9, + expectedSkipped: []sequenceRange{{1, 1}}, + }, + { + // overlapping ranges, low range arrives first + incoming: []sequenceRange{{4, 8}, {6, 10}}, + expectedNextSequence: 11, + expectedSkipped: []sequenceRange{{1, 3}}, + }, + { + // completely overlapping ranges, larger range arrives first + incoming: []sequenceRange{{4, 8}, {6, 8}}, + expectedNextSequence: 9, + expectedSkipped: []sequenceRange{{1, 3}}, + }, + { + // range arrives then partly overlapping range arrives + incoming: []sequenceRange{{4, 8}, {6, 10}}, + expectedNextSequence: 11, + expectedSkipped: []sequenceRange{{1, 3}}, + }, + { + // range arrives, partly overlapping range arrives the single overlapping range + incoming: []sequenceRange{{4, 8}, {8, 10}, {9, 0}}, + channelName: "B", + expectedNextSequence: 10, + expectedCached: []uint64{9}, + expectedSkipped: []sequenceRange{{1, 3}}, + }, + { + // single range arrives, left side overlapping range arrives + incoming: []sequenceRange{{4, 0}, {2, 4}}, + expectedNextSequence: 5, + expectedCached: []uint64{4}, + expectedSkipped: []sequenceRange{{1, 1}}, + }, + { + // single sequence arrives, right side overlapping range arrives + incoming: []sequenceRange{{4, 0}, {4, 5}}, + channelName: "C", + expectedNextSequence: 6, + expectedSkipped: []sequenceRange{{1, 3}}, + }, + { + // range arrives, lower end overlapping range arrives + incoming: []sequenceRange{{6, 10}, {4, 8}}, + expectedNextSequence: 11, + expectedSkipped: []sequenceRange{{1, 3}}, + }, + } + + for index, testCase := range testCases { + t.Run(fmt.Sprintf("case_%d", index), func(t *testing.T) { + testChannelID := channels.NewID(testCase.channelName, GetSingleDatabaseCollection(t, dbContext).GetCollectionID()) + testChangeCache := &changeCache{} + if err := testChangeCache.Init(ctx, dbContext, dbContext.channelCache, nil, &CacheOptions{ + CachePendingSeqMaxWait: 1 * time.Minute, + CacheSkippedSeqMaxWait: 20 * time.Minute, + CachePendingSeqMaxNum: 10, + }, dbContext.MetadataKeys); err != nil { + log.Printf("Init failed for testChangeCache: %v", err) + t.Fail() + } + + if err := testChangeCache.Start(0); err != nil { + log.Printf("Start error for testChangeCache: %v", err) + t.Fail() + } + defer testChangeCache.Stop(ctx) + require.NoError(t, err) + + // If we expect cached entries, perform a get to warm the cache for the channel + if len(testCase.expectedCached) > 0 { + cachedEntries, err := testChangeCache.getChannelCache().GetCachedChanges(ctx, testChannelID) + require.NoError(t, err) + require.Equal(t, 0, len(cachedEntries)) + } + + // process overlapping unused sequence ranges that should end up going to pending without duplicates + // acquire cache lock to push to pending logs + testChangeCache.lock.Lock() + backdatedTimeReceived := time.Now().Add(-1 * time.Hour) + for i, incomingRange := range testCase.incoming { + if incomingRange.end == 0 { + // treat as a document pushed to pending + logEntry := logEntry(incomingRange.start, fmt.Sprintf("doc%d", i), "1-abc", []string{testChannelID.Name}, testChannelID.CollectionID) + logEntry.TimeReceived = backdatedTimeReceived + heap.Push(&testChangeCache.pendingLogs, logEntry) + } else { + testChangeCache._pushRangeToPending(ctx, incomingRange.start, incomingRange.end, backdatedTimeReceived) + } + } + // Call _addPendingLogs to trigger eviction from pendingLogs based on age + _ = testChangeCache._addPendingLogs(ctx) + assert.Equal(t, int(testCase.expectedNextSequence), int(testChangeCache.nextSequence), "Cache nextSequence doesn't match expected") + testChangeCache.lock.Unlock() + if len(testCase.expectedCached) > 0 { + cachedEntries, err := testChangeCache.getChannelCache().GetCachedChanges(ctx, testChannelID) + require.NoError(t, err) + require.Equal(t, len(testCase.expectedCached), len(cachedEntries)) + } + if len(testCase.expectedSkipped) > 0 { + require.Equal(t, len(testCase.expectedSkipped), len(testChangeCache.skippedSeqs.list), "Number of skipped sequence entries doesn't match expected") + for i, skippedEntry := range testChangeCache.skippedSeqs.list { + expectedEntry := testCase.expectedSkipped[i] + assert.Equal(t, expectedEntry.start, skippedEntry.start, "skipped entry start mismatch") + assert.Equal(t, expectedEntry.end, skippedEntry.end, "skipped entry end mismatch") + } + } + + }) + } + +} + +type sequenceRange struct { + start uint64 + end uint64 +} + +func ExpectedRange(start, end uint64) sequenceRange { + return sequenceRange{ + start: start, + end: end, + } +} + +// AssertPendingLogs validates the ordering of the provided LogPriorityQueue against the ordered expectedPending slice. +// We don't want to modify the incoming LogPriorityQueue, so makes a copy and then performs heap removal +func AssertPendingLogs(t *testing.T, pendingLogs LogPriorityQueue, expectedPending []sequenceRange) { + + pendingCopy := make(LogPriorityQueue, len(pendingLogs)) + _ = copy(pendingCopy, pendingLogs) + if len(pendingCopy) != len(expectedPending) { + t.Errorf("Mismatch between length of pendingLogs and expectedPending. pendingLogs: %s, expectedPending: %v", pendingLogsAsString(pendingCopy), expectedPending) + return + } + + for i, expectedEntry := range expectedPending { + pendingEntry := heap.Pop(&pendingCopy).(*LogEntry) + assert.True(t, expectedEntry.start == pendingEntry.Sequence && expectedEntry.end == pendingEntry.EndSequence, "entry #%d, expected [%d,%d], got [%d,%d]", i, expectedEntry.start, expectedEntry.end, pendingEntry.Sequence, pendingEntry.EndSequence) + } +} + +func pendingLogsAsString(pendingLogs LogPriorityQueue) string { + count := len(pendingLogs) + result := "{" + delimiter := "" + for i := 0; i < count; i++ { + pendingEntry := heap.Pop(&pendingLogs).(*LogEntry) + result += fmt.Sprintf("%s[%d, %d]", delimiter, pendingEntry.Sequence, pendingEntry.EndSequence) + delimiter = "," + } + result += "}" + return result +}