diff --git a/db/change_cache.go b/db/change_cache.go index e4b581c919..34fbc81f21 100644 --- a/db/change_cache.go +++ b/db/change_cache.go @@ -575,7 +575,6 @@ func (c *changeCache) releaseUnusedSequence(ctx context.Context, sequence uint64 } else { changedChannels.Add(unusedSeq) } - c.channelCache.AddUnusedSequence(change) if c.notifyChange != nil && len(changedChannels) > 0 { c.notifyChange(ctx, changedChannels) } @@ -598,7 +597,6 @@ func (c *changeCache) releaseUnusedSequenceRange(ctx context.Context, fromSequen } changedChannels := c.processEntry(ctx, change) allChangedChannels = allChangedChannels.Update(changedChannels) - c.channelCache.AddUnusedSequence(change) if c.notifyChange != nil { c.notifyChange(ctx, allChangedChannels) } @@ -608,9 +606,6 @@ func (c *changeCache) releaseUnusedSequenceRange(ctx context.Context, fromSequen // push unused range to either pending or skipped lists based on current state of the change cache allChangedChannels = c.processUnusedRange(ctx, fromSequence, toSequence, allChangedChannels, timeReceived) - // update high seq cached - c.channelCache.AddUnusedSequence(&LogEntry{Sequence: toSequence}) - if c.notifyChange != nil { c.notifyChange(ctx, allChangedChannels) } @@ -803,8 +798,9 @@ func (c *changeCache) _addToCache(ctx context.Context, change *LogEntry) []chann } delete(c.receivedSeqs, change.Sequence) - // If unused sequence or principal, we're done after updating sequence + // If unused sequence, notify the cache and return if change.DocID == "" { + c.channelCache.AddUnusedSequence(change) return nil } diff --git a/db/change_cache_test.go b/db/change_cache_test.go index 17374f692f..8a537c9e79 100644 --- a/db/change_cache_test.go +++ b/db/change_cache_test.go @@ -2538,7 +2538,7 @@ func TestReleasedSequenceRangeHandlingEverythingPending(t *testing.T) { assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.PendingSeqLen.Value()) assert.Equal(c, uint64(2), testChangeCache.nextSequence) dbContext.UpdateCalculatedStats(ctx) - assert.Equal(c, int64(25), dbContext.DbStats.CacheStats.HighSeqCached.Value()) + assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.HighSeqCached.Value()) }, time.Second*10, time.Millisecond*100) } @@ -2644,7 +2644,7 @@ func TestReleasedSequenceRangeHandlingEverythingPendingLowPendingCapacity(t *tes defer testChangeCache.Stop(ctx) require.NoError(t, err) - // process unused sequence range + // process unused sequence range, will be sent to pending. Triggers seq 1 being sent to skipped testChangeCache.releaseUnusedSequenceRange(ctx, 2, 25, time.Now()) require.EventuallyWithT(t, func(c *assert.CollectT) { @@ -2755,7 +2755,7 @@ func TestReleasedSequenceRangeHandlingSingleSequence(t *testing.T) { assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.PendingSeqLen.Value()) assert.Equal(c, uint64(1), testChangeCache.nextSequence) dbContext.UpdateCalculatedStats(ctx) - assert.Equal(c, int64(2), dbContext.DbStats.CacheStats.HighSeqCached.Value()) + assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.HighSeqCached.Value()) }, time.Second*10, time.Millisecond*100) // process change that should overload pending and push sequence 1 to skipped diff --git a/db/channel_cache.go b/db/channel_cache.go index 8dc64f252c..bcb35d19e5 100644 --- a/db/channel_cache.go +++ b/db/channel_cache.go @@ -193,7 +193,11 @@ func (c *channelCacheImpl) AddPrincipal(change *LogEntry) { // Add unused Sequence notifies the cache of an unused sequence update. Updates the cache's high sequence func (c *channelCacheImpl) AddUnusedSequence(change *LogEntry) { - c.updateHighCacheSequence(change.Sequence) + if change.EndSequence > 0 { + c.updateHighCacheSequence(change.EndSequence) + } else { + c.updateHighCacheSequence(change.Sequence) + } } // Adds an entry to the appropriate channels' caches, returning the affected channels. lateSequence