From b6127c1bf11a4231f3758dd63654256a92470028 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Tue, 3 Sep 2024 17:17:44 -0700 Subject: [PATCH] CBG-4218 [3.1.11 backport] Move duplicate checks for pending sequences to read side (#7105) Reduces complexity and avoids loop bug when duplicates are present when running _addPendingLogs. When popping an unused sequence range from pendingLogs, checks for overlap with the next entry in pending. When found, truncates range at next pending entry. Adds check in addPendingLogs for the case where the pending includes sequences earlier than nextSequence. Info logging emitted for any unexpected duplication. --- db/change_cache.go | 121 ++++++++-------- db/change_cache_test.go | 313 +++++++++++++++++++++++++++++----------- 2 files changed, 286 insertions(+), 148 deletions(-) diff --git a/db/change_cache.go b/db/change_cache.go index 83e0ce5a61..e4b581c919 100644 --- a/db/change_cache.go +++ b/db/change_cache.go @@ -16,7 +16,6 @@ import ( "errors" "fmt" "math" - "sort" "strconv" "strings" "sync" @@ -126,6 +125,10 @@ func (entry *LogEntry) SetDeleted() { entry.Flags |= channels.Deleted } +func (entry *LogEntry) IsUnusedRange() bool { + return entry.DocID == "" && entry.EndSequence > 0 +} + type LogEntries []*LogEntry // A priority-queue of LogEntries, kept ordered by increasing sequence #. @@ -633,71 +636,23 @@ func (c *changeCache) processUnusedRange(ctx context.Context, fromSequence, toSe // isn't possible under normal processing - unused sequence ranges will normally be moved // from pending to skipped in their entirety, as it's the processing of the pending sequence // *after* the range that triggers the range to be skipped. A partial range in skipped means - // an duplicate entry with a sequence within the bounds of the range was previously present + // a duplicate entry with a sequence within the bounds of the range was previously present // in pending. - base.WarnfCtx(ctx, "unused sequence range of #%d to %d contains duplicate sequences", fromSequence, toSequence) + base.WarnfCtx(ctx, "unused sequence range of #%d to %d contains duplicate sequences, will be ignored", fromSequence, toSequence) } return allChangedChannels } -// _pushRangeToPending will push a sequence range to pending logs. If pending has entries in it, we will check if -// those entries are in the range and handle it, so we don't push duplicate sequences to pending +// _pushRangeToPending will push a sequence range to pendingLogs func (c *changeCache) _pushRangeToPending(ctx context.Context, startSeq, endSeq uint64, timeReceived time.Time) { - if c.pendingLogs.Len() == 0 { - // push whole range & return early to avoid duplicate checks - entry := &LogEntry{ - TimeReceived: timeReceived, - Sequence: startSeq, - EndSequence: endSeq, - } - heap.Push(&c.pendingLogs, entry) - return - } - // check for duplicate sequences between range and pending logs - // loop till we have processed unused sequence range (or until we - // have range of sequences that aren't present in pending list) - for startSeq <= endSeq { - i, found := sort.Find(c.pendingLogs.Len(), func(i int) int { - value := c.pendingLogs[i] - if value.Sequence > endSeq { - // range is less than current pending entry - return -1 - } - if startSeq <= value.Sequence && endSeq >= value.Sequence { - // found pending entry that has duplicate entry between itself and unused range - return 0 - } - // range is larger then current element - return 1 - }) - if found { - // grab pending entry at that index and process unused range between startSeq and pending entry.Sequence - 1 - pendingEntry := c.pendingLogs[i] - base.DebugfCtx(ctx, base.KeyCache, "Ignoring duplicate of #%d (unusedSequence)", pendingEntry.Sequence) - entry := &LogEntry{ - TimeReceived: timeReceived, - Sequence: startSeq, - EndSequence: pendingEntry.Sequence - 1, - } - heap.Push(&c.pendingLogs, entry) - // update start seq on range - startSeq = pendingEntry.Sequence + 1 - } else { - // if range not found in pending then break from loop early - break - } + entry := &LogEntry{ + TimeReceived: timeReceived, + Sequence: startSeq, + EndSequence: endSeq, } + heap.Push(&c.pendingLogs, entry) - // push what's left of seq range - if startSeq <= endSeq { - entry := &LogEntry{ - TimeReceived: timeReceived, - Sequence: startSeq, - EndSequence: endSeq, - } - heap.Push(&c.pendingLogs, entry) - } } // Process unused sequence notification. Extracts sequence from docID and sends to cache for buffering @@ -885,16 +840,23 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set { isNext = oldestPending.Sequence == c.nextSequence if isNext { - heap.Pop(&c.pendingLogs) + oldestPending = c._popPendingLog(ctx) changedChannels = changedChannels.UpdateWithSlice(c._addToCache(ctx, oldestPending)) + } else if oldestPending.Sequence < c.nextSequence { + // oldest pending is lower than next sequence, should be ignored + base.InfofCtx(ctx, base.KeyCache, "Oldest entry in pending logs %v (%d, %d) is earlier than cache next sequence (%d), ignoring as sequence has already been cached", base.UD(oldestPending.DocID), oldestPending.Sequence, oldestPending.EndSequence, c.nextSequence) + oldestPending = c._popPendingLog(ctx) + + // If the oldestPending was a range that extended past nextSequence, update nextSequence + if oldestPending.IsUnusedRange() && oldestPending.EndSequence >= c.nextSequence { + c.nextSequence = oldestPending.EndSequence + 1 + } } else if len(c.pendingLogs) > c.options.CachePendingSeqMaxNum || time.Since(c.pendingLogs[0].TimeReceived) >= c.options.CachePendingSeqMaxWait { // Skip all sequences up to the oldest Pending c.PushSkipped(ctx, c.nextSequence, oldestPending.Sequence-1) - // disallow c.nextSequence decreasing - if c.nextSequence < oldestPending.Sequence { - c.nextSequence = oldestPending.Sequence - } + c.nextSequence = oldestPending.Sequence } else { + // nextSequence is not in pending logs, and pending logs size/age doesn't trigger skipped sequences break } } @@ -905,6 +867,41 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set { return changedChannels } +// _popPendingLog pops the next pending LogEntry from the c.pendingLogs heap. When the popped entry is an unused range, +// performs a defensive check for duplicates with the next entry in pending. If unused range overlaps with next entry, +// reduces the unused range to stop at the next pending entry. +func (c *changeCache) _popPendingLog(ctx context.Context) *LogEntry { + poppedEntry := heap.Pop(&c.pendingLogs).(*LogEntry) + // If it's not a range, no additional handling needed + if !poppedEntry.IsUnusedRange() { + return poppedEntry + } + // If there are no more pending logs, no additional handling needed + if len(c.pendingLogs) == 0 { + return poppedEntry + } + + nextPendingEntry := c.pendingLogs[0] + // If popped entry range does not overlap with next pending entry, no additional handling needed + // e.g. popped [15-20], nextPendingEntry is [25] + if poppedEntry.EndSequence < nextPendingEntry.Sequence { + return poppedEntry + } + + // If nextPendingEntry's sequence duplicates the start of the unused range, ignored popped entry and return next entry instead + // e.g. popped [15-20], nextPendingEntry is [15] + if poppedEntry.Sequence == nextPendingEntry.Sequence { + base.InfofCtx(ctx, base.KeyCache, "Unused sequence range in pendingLogs (%d, %d) has start equal to next pending sequence (%s, %d) - unused range will be ignored", poppedEntry.Sequence, poppedEntry.EndSequence, nextPendingEntry.DocID, nextPendingEntry.Sequence) + return c._popPendingLog(ctx) + } + + // Otherwise, reduce the popped unused range to end before the next pending sequence + // e.g. popped [15-20], nextPendingEntry is [18] + base.InfofCtx(ctx, base.KeyCache, "Unused sequence range in pendingLogs (%d, %d) overlaps with next pending sequence (%s, %d) - unused range will be truncated", poppedEntry.Sequence, poppedEntry.EndSequence, nextPendingEntry.DocID, nextPendingEntry.Sequence) + poppedEntry.EndSequence = nextPendingEntry.Sequence - 1 + return poppedEntry +} + func (c *changeCache) GetStableSequence(docID string) SequenceID { // Stable sequence is independent of docID in changeCache return SequenceID{Seq: c.LastSequence()} diff --git a/db/change_cache_test.go b/db/change_cache_test.go index 832877e8be..17374f692f 100644 --- a/db/change_cache_test.go +++ b/db/change_cache_test.go @@ -9,6 +9,7 @@ package db import ( + "container/heap" "context" "encoding/binary" "errors" @@ -2932,92 +2933,6 @@ func TestReleasedSequenceRangeHandlingEdgeCase2(t *testing.T) { }, time.Second*10, time.Millisecond*100) } -// TestReleasedSequenceRangeHandlingDuplicateSequencesInPending: -// - Test releasing unused sequence range that should be pushed to pending -// - Mock having a sequences in middle of the range already in pending -// - Assert that the range pushed is split into separate ranges onto pending list -// - Add new unused range to unblocks pending , assert it empties pending and skipped is not filled -func TestReleasedSequenceRangeHandlingDuplicateSequencesInPending(t *testing.T) { - base.SetUpTestLogging(t, base.LevelDebug, base.KeyCache) - - ctx := base.TestCtx(t) - bucket := base.GetTestBucket(t) - dbContext, err := NewDatabaseContext(ctx, "db", bucket, false, DatabaseContextOptions{ - Scopes: GetScopesOptions(t, bucket, 1), - }) - require.NoError(t, err) - defer dbContext.Close(ctx) - - err = dbContext.StartOnlineProcesses(ctx) - require.NoError(t, err) - - ctx = dbContext.AddDatabaseLogContext(ctx) - testChangeCache := &changeCache{} - if err := testChangeCache.Init(ctx, dbContext, dbContext.channelCache, nil, &CacheOptions{ - CachePendingSeqMaxWait: 20 * time.Minute, - CacheSkippedSeqMaxWait: 20 * time.Minute, - CachePendingSeqMaxNum: 20, - }, dbContext.MetadataKeys); err != nil { - log.Printf("Init failed for testChangeCache: %v", err) - t.Fail() - } - - if err := testChangeCache.Start(0); err != nil { - log.Printf("Start error for testChangeCache: %v", err) - t.Fail() - } - defer testChangeCache.Stop(ctx) - require.NoError(t, err) - - // push two entries that will be pushed to pending - entry := &LogEntry{ - Sequence: 14, - DocID: fmt.Sprintf("doc_%d", 50), - RevID: "1-abcdefabcdefabcdef", - TimeReceived: time.Now(), - TimeSaved: time.Now(), - } - _ = testChangeCache.processEntry(ctx, entry) - - entry = &LogEntry{ - Sequence: 18, - DocID: fmt.Sprintf("doc_%d", 50), - RevID: "1-abcdefabcdefabcdef", - TimeReceived: time.Now(), - TimeSaved: time.Now(), - } - _ = testChangeCache.processEntry(ctx, entry) - - // process unusedSeq range with pending range containing duplicate sequences that are in pending list - // Pending should contain: (10-13), 14, (15-17) 18, (19-20) - testChangeCache.releaseUnusedSequenceRange(ctx, 10, 20, time.Now()) - - // assert pending has 4 elements - require.EventuallyWithT(t, func(c *assert.CollectT) { - testChangeCache.updateStats(ctx) - assert.Equal(c, int64(5), dbContext.DbStats.CacheStats.PendingSeqLen.Value()) - assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) - assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) - assert.Equal(c, uint64(1), testChangeCache.nextSequence) - dbContext.UpdateCalculatedStats(ctx) - assert.Equal(c, int64(20), dbContext.DbStats.CacheStats.HighSeqCached.Value()) - }, time.Second*10, time.Millisecond*100) - - // unblock pending and assert items are processed correct - testChangeCache.releaseUnusedSequenceRange(ctx, 1, 10, time.Now()) - - // assert pending empties - require.EventuallyWithT(t, func(c *assert.CollectT) { - testChangeCache.updateStats(ctx) - assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.PendingSeqLen.Value()) - assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) - assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) - assert.Equal(c, uint64(21), testChangeCache.nextSequence) - dbContext.UpdateCalculatedStats(ctx) - assert.Equal(c, int64(20), dbContext.DbStats.CacheStats.HighSeqCached.Value()) - }, time.Second*10, time.Millisecond*100) -} - // TestReleasedSequenceRangeHandlingDuplicateSequencesInSkipped: // - Test releasing unused sequence range that has duplicate sequences in skipped sequence list // - Assert sequences in the range are removed from skipped @@ -3135,3 +3050,229 @@ func TestReleasedSequenceRangeHandlingDuplicateSequencesInSkipped(t *testing.T) assert.Equal(c, int64(19), dbContext.DbStats.CacheStats.HighSeqCached.Value()) }, time.Second*10, time.Millisecond*100) } + +// TestAddPendingLogs: +// - Test age-based eviction of sequences and ranges from pending logs. +// - Adds to pending logs directly via heap.Push with backdated TimeReceived, +// triggers eviction with call to _addPendingLogs +// - tests duplicate handling when popping pending entries +// - reproduces CBG-4215 +func TestAddPendingLogs(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeyCache, base.KeyChanges) + + ctx := base.TestCtx(t) + bucket := base.GetTestBucket(t) + dbContext, err := NewDatabaseContext(ctx, "db", bucket, false, DatabaseContextOptions{ + Scopes: GetScopesOptions(t, bucket, 1), + }) + require.NoError(t, err) + defer dbContext.Close(ctx) + + ctx = dbContext.AddDatabaseLogContext(ctx) + err = dbContext.StartOnlineProcesses(ctx) + require.NoError(t, err) + + testCases := []struct { + incoming []sequenceRange // test simulates adding these to pending in the order found in the slice + channelName string // non-range values (endSeq=0) will be assigned to this channel for verification of proper caching + expectedCached []uint64 // expected cached sequences + expectedSkipped []sequenceRange // expected skipped sequence ranges + expectedNextSequence uint64 // expected cache.nextSequence + }{ + { + // single range + incoming: []sequenceRange{{2, 4}}, + expectedNextSequence: 5, + expectedSkipped: []sequenceRange{{1, 1}}, + }, + { + // multiple non-overlapping ranges arriving out of sequence order + incoming: []sequenceRange{{2, 4}, {9, 10}, {6, 8}, {14, 20}, {11, 13}, {5, 5}}, + expectedNextSequence: 21, + expectedSkipped: []sequenceRange{{1, 1}}, + }, + { + // non overlapping ranges, arrive in sequence order + incoming: []sequenceRange{{2, 4}, {5, 8}}, + expectedNextSequence: 9, + expectedSkipped: []sequenceRange{{1, 1}}, + }, + { + // non overlapping ranges, arrive out of sequence order + incoming: []sequenceRange{{5, 8}, {2, 4}}, + expectedNextSequence: 9, + expectedSkipped: []sequenceRange{{1, 1}}, + }, + { + // range overlaps single, range arrives first + incoming: []sequenceRange{{2, 4}, {3, 0}}, + channelName: "A", + expectedNextSequence: 4, + expectedCached: []uint64{3}, + expectedSkipped: []sequenceRange{{1, 1}}, + }, + { + // single arrives then range overlaps single both sides + incoming: []sequenceRange{{3, 3}, {2, 4}}, + expectedNextSequence: 4, + expectedSkipped: []sequenceRange{{1, 1}}, + }, + { + // range arrives then another that completely covers range in the pending list + incoming: []sequenceRange{{4, 8}, {2, 10}}, + expectedNextSequence: 9, + expectedSkipped: []sequenceRange{{1, 1}}, + }, + { + // overlapping ranges, low range arrives first + incoming: []sequenceRange{{4, 8}, {6, 10}}, + expectedNextSequence: 11, + expectedSkipped: []sequenceRange{{1, 3}}, + }, + { + // completely overlapping ranges, larger range arrives first + incoming: []sequenceRange{{4, 8}, {6, 8}}, + expectedNextSequence: 9, + expectedSkipped: []sequenceRange{{1, 3}}, + }, + { + // range arrives then partly overlapping range arrives + incoming: []sequenceRange{{4, 8}, {6, 10}}, + expectedNextSequence: 11, + expectedSkipped: []sequenceRange{{1, 3}}, + }, + { + // range arrives, partly overlapping range arrives the single overlapping range + incoming: []sequenceRange{{4, 8}, {8, 10}, {9, 0}}, + channelName: "B", + expectedNextSequence: 10, + expectedCached: []uint64{9}, + expectedSkipped: []sequenceRange{{1, 3}}, + }, + { + // single range arrives, left side overlapping range arrives + incoming: []sequenceRange{{4, 0}, {2, 4}}, + expectedNextSequence: 5, + expectedCached: []uint64{4}, + expectedSkipped: []sequenceRange{{1, 1}}, + }, + { + // single sequence arrives, right side overlapping range arrives + incoming: []sequenceRange{{4, 0}, {4, 5}}, + channelName: "C", + expectedNextSequence: 6, + expectedSkipped: []sequenceRange{{1, 3}}, + }, + { + // range arrives, lower end overlapping range arrives + incoming: []sequenceRange{{6, 10}, {4, 8}}, + expectedNextSequence: 11, + expectedSkipped: []sequenceRange{{1, 3}}, + }, + } + + for index, testCase := range testCases { + t.Run(fmt.Sprintf("case_%d", index), func(t *testing.T) { + testChannelID := channels.NewID(testCase.channelName, GetSingleDatabaseCollection(t, dbContext).GetCollectionID()) + testChangeCache := &changeCache{} + if err := testChangeCache.Init(ctx, dbContext, dbContext.channelCache, nil, &CacheOptions{ + CachePendingSeqMaxWait: 1 * time.Minute, + CacheSkippedSeqMaxWait: 20 * time.Minute, + CachePendingSeqMaxNum: 10, + }, dbContext.MetadataKeys); err != nil { + log.Printf("Init failed for testChangeCache: %v", err) + t.Fail() + } + + if err := testChangeCache.Start(0); err != nil { + log.Printf("Start error for testChangeCache: %v", err) + t.Fail() + } + defer testChangeCache.Stop(ctx) + require.NoError(t, err) + + // If we expect cached entries, perform a get to warm the cache for the channel + if len(testCase.expectedCached) > 0 { + cachedEntries, err := testChangeCache.getChannelCache().GetCachedChanges(ctx, testChannelID) + require.NoError(t, err) + require.Equal(t, 0, len(cachedEntries)) + } + + // process overlapping unused sequence ranges that should end up going to pending without duplicates + // acquire cache lock to push to pending logs + testChangeCache.lock.Lock() + backdatedTimeReceived := time.Now().Add(-1 * time.Hour) + for i, incomingRange := range testCase.incoming { + if incomingRange.end == 0 { + // treat as a document pushed to pending + logEntry := logEntry(incomingRange.start, fmt.Sprintf("doc%d", i), "1-abc", []string{testChannelID.Name}, testChannelID.CollectionID) + logEntry.TimeReceived = backdatedTimeReceived + heap.Push(&testChangeCache.pendingLogs, logEntry) + } else { + testChangeCache._pushRangeToPending(ctx, incomingRange.start, incomingRange.end, backdatedTimeReceived) + } + } + // Call _addPendingLogs to trigger eviction from pendingLogs based on age + _ = testChangeCache._addPendingLogs(ctx) + assert.Equal(t, int(testCase.expectedNextSequence), int(testChangeCache.nextSequence), "Cache nextSequence doesn't match expected") + testChangeCache.lock.Unlock() + if len(testCase.expectedCached) > 0 { + cachedEntries, err := testChangeCache.getChannelCache().GetCachedChanges(ctx, testChannelID) + require.NoError(t, err) + require.Equal(t, len(testCase.expectedCached), len(cachedEntries)) + } + if len(testCase.expectedSkipped) > 0 { + require.Equal(t, len(testCase.expectedSkipped), len(testChangeCache.skippedSeqs.list), "Number of skipped sequence entries doesn't match expected") + for i, skippedEntry := range testChangeCache.skippedSeqs.list { + expectedEntry := testCase.expectedSkipped[i] + assert.Equal(t, expectedEntry.start, skippedEntry.start, "skipped entry start mismatch") + assert.Equal(t, expectedEntry.end, skippedEntry.end, "skipped entry end mismatch") + } + } + + }) + } + +} + +type sequenceRange struct { + start uint64 + end uint64 +} + +func ExpectedRange(start, end uint64) sequenceRange { + return sequenceRange{ + start: start, + end: end, + } +} + +// AssertPendingLogs validates the ordering of the provided LogPriorityQueue against the ordered expectedPending slice. +// We don't want to modify the incoming LogPriorityQueue, so makes a copy and then performs heap removal +func AssertPendingLogs(t *testing.T, pendingLogs LogPriorityQueue, expectedPending []sequenceRange) { + + pendingCopy := make(LogPriorityQueue, len(pendingLogs)) + _ = copy(pendingCopy, pendingLogs) + if len(pendingCopy) != len(expectedPending) { + t.Errorf("Mismatch between length of pendingLogs and expectedPending. pendingLogs: %s, expectedPending: %v", pendingLogsAsString(pendingCopy), expectedPending) + return + } + + for i, expectedEntry := range expectedPending { + pendingEntry := heap.Pop(&pendingCopy).(*LogEntry) + assert.True(t, expectedEntry.start == pendingEntry.Sequence && expectedEntry.end == pendingEntry.EndSequence, "entry #%d, expected [%d,%d], got [%d,%d]", i, expectedEntry.start, expectedEntry.end, pendingEntry.Sequence, pendingEntry.EndSequence) + } +} + +func pendingLogsAsString(pendingLogs LogPriorityQueue) string { + count := len(pendingLogs) + result := "{" + delimiter := "" + for i := 0; i < count; i++ { + pendingEntry := heap.Pop(&pendingLogs).(*LogEntry) + result += fmt.Sprintf("%s[%d, %d]", delimiter, pendingEntry.Sequence, pendingEntry.EndSequence) + delimiter = "," + } + result += "}" + return result +}