Skip to content

Commit

Permalink
CBG-4218 [3.1.11 backport] Move duplicate checks for pending sequence…
Browse files Browse the repository at this point in the history
…s to read side (#7105)

Reduces complexity and avoids loop bug when duplicates are present when running _addPendingLogs.

When popping an unused sequence range from pendingLogs, checks for overlap with the next entry in pending.  When found, truncates range at next pending entry.

Adds check in addPendingLogs for the case where the pending includes sequences earlier than nextSequence.

Info logging emitted for any unexpected duplication.
  • Loading branch information
adamcfraser authored Sep 4, 2024
1 parent edcbaeb commit b6127c1
Show file tree
Hide file tree
Showing 2 changed files with 286 additions and 148 deletions.
121 changes: 59 additions & 62 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"errors"
"fmt"
"math"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -126,6 +125,10 @@ func (entry *LogEntry) SetDeleted() {
entry.Flags |= channels.Deleted
}

func (entry *LogEntry) IsUnusedRange() bool {
return entry.DocID == "" && entry.EndSequence > 0
}

type LogEntries []*LogEntry

// A priority-queue of LogEntries, kept ordered by increasing sequence #.
Expand Down Expand Up @@ -633,71 +636,23 @@ func (c *changeCache) processUnusedRange(ctx context.Context, fromSequence, toSe
// isn't possible under normal processing - unused sequence ranges will normally be moved
// from pending to skipped in their entirety, as it's the processing of the pending sequence
// *after* the range that triggers the range to be skipped. A partial range in skipped means
// an duplicate entry with a sequence within the bounds of the range was previously present
// a duplicate entry with a sequence within the bounds of the range was previously present
// in pending.
base.WarnfCtx(ctx, "unused sequence range of #%d to %d contains duplicate sequences", fromSequence, toSequence)
base.WarnfCtx(ctx, "unused sequence range of #%d to %d contains duplicate sequences, will be ignored", fromSequence, toSequence)
}
return allChangedChannels
}

// _pushRangeToPending will push a sequence range to pending logs. If pending has entries in it, we will check if
// those entries are in the range and handle it, so we don't push duplicate sequences to pending
// _pushRangeToPending will push a sequence range to pendingLogs
func (c *changeCache) _pushRangeToPending(ctx context.Context, startSeq, endSeq uint64, timeReceived time.Time) {
if c.pendingLogs.Len() == 0 {
// push whole range & return early to avoid duplicate checks
entry := &LogEntry{
TimeReceived: timeReceived,
Sequence: startSeq,
EndSequence: endSeq,
}
heap.Push(&c.pendingLogs, entry)
return
}

// check for duplicate sequences between range and pending logs
// loop till we have processed unused sequence range (or until we
// have range of sequences that aren't present in pending list)
for startSeq <= endSeq {
i, found := sort.Find(c.pendingLogs.Len(), func(i int) int {
value := c.pendingLogs[i]
if value.Sequence > endSeq {
// range is less than current pending entry
return -1
}
if startSeq <= value.Sequence && endSeq >= value.Sequence {
// found pending entry that has duplicate entry between itself and unused range
return 0
}
// range is larger then current element
return 1
})
if found {
// grab pending entry at that index and process unused range between startSeq and pending entry.Sequence - 1
pendingEntry := c.pendingLogs[i]
base.DebugfCtx(ctx, base.KeyCache, "Ignoring duplicate of #%d (unusedSequence)", pendingEntry.Sequence)
entry := &LogEntry{
TimeReceived: timeReceived,
Sequence: startSeq,
EndSequence: pendingEntry.Sequence - 1,
}
heap.Push(&c.pendingLogs, entry)
// update start seq on range
startSeq = pendingEntry.Sequence + 1
} else {
// if range not found in pending then break from loop early
break
}
entry := &LogEntry{
TimeReceived: timeReceived,
Sequence: startSeq,
EndSequence: endSeq,
}
heap.Push(&c.pendingLogs, entry)

// push what's left of seq range
if startSeq <= endSeq {
entry := &LogEntry{
TimeReceived: timeReceived,
Sequence: startSeq,
EndSequence: endSeq,
}
heap.Push(&c.pendingLogs, entry)
}
}

// Process unused sequence notification. Extracts sequence from docID and sends to cache for buffering
Expand Down Expand Up @@ -885,16 +840,23 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set {
isNext = oldestPending.Sequence == c.nextSequence

if isNext {
heap.Pop(&c.pendingLogs)
oldestPending = c._popPendingLog(ctx)
changedChannels = changedChannels.UpdateWithSlice(c._addToCache(ctx, oldestPending))
} else if oldestPending.Sequence < c.nextSequence {
// oldest pending is lower than next sequence, should be ignored
base.InfofCtx(ctx, base.KeyCache, "Oldest entry in pending logs %v (%d, %d) is earlier than cache next sequence (%d), ignoring as sequence has already been cached", base.UD(oldestPending.DocID), oldestPending.Sequence, oldestPending.EndSequence, c.nextSequence)
oldestPending = c._popPendingLog(ctx)

// If the oldestPending was a range that extended past nextSequence, update nextSequence
if oldestPending.IsUnusedRange() && oldestPending.EndSequence >= c.nextSequence {
c.nextSequence = oldestPending.EndSequence + 1
}
} else if len(c.pendingLogs) > c.options.CachePendingSeqMaxNum || time.Since(c.pendingLogs[0].TimeReceived) >= c.options.CachePendingSeqMaxWait {
// Skip all sequences up to the oldest Pending
c.PushSkipped(ctx, c.nextSequence, oldestPending.Sequence-1)
// disallow c.nextSequence decreasing
if c.nextSequence < oldestPending.Sequence {
c.nextSequence = oldestPending.Sequence
}
c.nextSequence = oldestPending.Sequence
} else {
// nextSequence is not in pending logs, and pending logs size/age doesn't trigger skipped sequences
break
}
}
Expand All @@ -905,6 +867,41 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set {
return changedChannels
}

// _popPendingLog pops the next pending LogEntry from the c.pendingLogs heap. When the popped entry is an unused range,
// performs a defensive check for duplicates with the next entry in pending. If unused range overlaps with next entry,
// reduces the unused range to stop at the next pending entry.
func (c *changeCache) _popPendingLog(ctx context.Context) *LogEntry {
poppedEntry := heap.Pop(&c.pendingLogs).(*LogEntry)
// If it's not a range, no additional handling needed
if !poppedEntry.IsUnusedRange() {
return poppedEntry
}
// If there are no more pending logs, no additional handling needed
if len(c.pendingLogs) == 0 {
return poppedEntry
}

nextPendingEntry := c.pendingLogs[0]
// If popped entry range does not overlap with next pending entry, no additional handling needed
// e.g. popped [15-20], nextPendingEntry is [25]
if poppedEntry.EndSequence < nextPendingEntry.Sequence {
return poppedEntry
}

// If nextPendingEntry's sequence duplicates the start of the unused range, ignored popped entry and return next entry instead
// e.g. popped [15-20], nextPendingEntry is [15]
if poppedEntry.Sequence == nextPendingEntry.Sequence {
base.InfofCtx(ctx, base.KeyCache, "Unused sequence range in pendingLogs (%d, %d) has start equal to next pending sequence (%s, %d) - unused range will be ignored", poppedEntry.Sequence, poppedEntry.EndSequence, nextPendingEntry.DocID, nextPendingEntry.Sequence)
return c._popPendingLog(ctx)
}

// Otherwise, reduce the popped unused range to end before the next pending sequence
// e.g. popped [15-20], nextPendingEntry is [18]
base.InfofCtx(ctx, base.KeyCache, "Unused sequence range in pendingLogs (%d, %d) overlaps with next pending sequence (%s, %d) - unused range will be truncated", poppedEntry.Sequence, poppedEntry.EndSequence, nextPendingEntry.DocID, nextPendingEntry.Sequence)
poppedEntry.EndSequence = nextPendingEntry.Sequence - 1
return poppedEntry
}

func (c *changeCache) GetStableSequence(docID string) SequenceID {
// Stable sequence is independent of docID in changeCache
return SequenceID{Seq: c.LastSequence()}
Expand Down
Loading

0 comments on commit b6127c1

Please sign in to comment.